KEFCore: KEFCoreDbContext
KEFCoreDbContext is a special class which helps to define the DbContext and use Entity Framework Core provider for Apache Kafka™:
KEFCoreDbContextinherits fromDbContext: to define the model, and/or creating the database, see getting started in the docs and KEFCore usage
Singleton vs context-scoped options
KEFCore separates options into two categories:
Singleton options are shared across all DbContext instances pointing to the same physical Apache Kafka™ cluster (identified by ClusterId). They contribute to the EF Core Service Provider cache key — two contexts with different singleton options will use separate Service Providers. These include serialization types, backend architecture choices, and cluster-level configuration.
Context-scoped options are specific to each DbContext instance and do not affect Service Provider caching. They control runtime behavior such as synchronization timeouts and query optimization hints.
Singleton options
The following options are singleton-scoped and must be consistent across all DbContext instances sharing the same cluster:
- KeySerDesSelectorType: the .NET type to be used to allocate an external serializer for Apache Kafka™ record key — overridable per entity via
KEFCoreSerDesAttributeorHasKEFCoreSerDes() - ValueSerDesSelectorType: the .NET type to be used to allocate an external serializer for Apache Kafka™ record value — overridable per entity
- ValueContainerType: the .NET type to be used to allocate an external container class for Apache Kafka™ record value — overridable per entity
- UseKeyByteBufferDataTransfer: set to true to prefer
Java.Nio.ByteBufferdata exchange in serializer instances for keys - UseValueContainerByteBufferDataTransfer: set to true to prefer
Java.Nio.ByteBufferdata exchange in serializer instances for value containers - BootstrapServers: the server hosting the broker of Apache Kafka™ — used to resolve the
ClusterIdwhich is the actual Service Provider cache key - ApplicationId: the application identifier used for the Apache Kafka™ Streams topology — must be unique per process on the same cluster
- UseKNetStreams: set to true (default) to use the KNet version of Apache Kafka™ Streams instead of standard Apache Kafka™ Streams
- UsePersistentStorage: set to true to use persistent storage (RocksDB) between multiple application startups; set to false (default) for in-memory storage
- UseDeletePolicyForTopic: set to true to enable delete cleanup policy on topic creation
- DefaultNumPartitions: the default number of partitions used when topics are created for each entity (first-wins per cluster) — overridable per entity via
KEFCoreTopicPartitionsAttributeorHasKEFCoreTopicPartitions() - DefaultReplicationFactor: the replication factor to use when topics are created (first-wins per cluster) — overridable per entity via
KEFCoreTopicReplicationFactorAttributeorHasKEFCoreTopicReplicationFactor() - ProducerConfig: parameters to use for the Apache Kafka™ producer (cluster-level, first-wins) — individual producer settings can be overridden per entity via
KEFCoreProducerAttributeorHasKEFCoreProducer() - StreamsConfig: parameters to use for the Apache Kafka™ Streams application (cluster-level, first-wins)
- TopicConfig: parameters to use on topic creation for each entity (cluster-level, first-wins) — retention can be overridden per entity via
KEFCoreTopicRetentionAttributeorHasKEFCoreTopicRetention() - SecurityProtocol: the security protocol for broker connections (
PLAINTEXT,SSL,SASL_PLAINTEXT, orSASL_SSL) — must be consistent withSslConfigsandSaslConfigs(first-wins) - SslConfigs: SSL/TLS configuration built via
SslConfigsBuilder— required whenSecurityProtocolisSSLorSASL_SSL(first-wins) - SaslConfigs: SASL authentication configuration built via
SaslConfigsBuilder— required whenSecurityProtocolisSASL_PLAINTEXTorSASL_SSL(first-wins) UseCompactedReplicator: deprecated, will be removed in a future releaseDefaultConsumerInstances: deprecated, will be removed in a future releaseConsumerConfig: deprecated, will be removed in a future release
Note
Options marked as first-wins are singleton but do not affect the Service Provider cache key — the values set by the first DbContext to initialize the cluster are used for all subsequent contexts on the same cluster.
Context-scoped options
The following options are scoped to each DbContext instance:
- ReadOnlyMode: set to true (default is false) to reject any write operation for the entire context; the engine will also verify that topics have proper
AclOperation.READrights — individual entities can be marked read-only viaKEFCoreReadOnlyAttributeorIsKEFCoreReadOnly() - DefaultSynchronizationTimeout: the default timeout in milliseconds KEFCore waits for the backend to be in-sync with the Apache Kafka™ cluster after a
SaveChanges; set toTimeout.Infinite(default) to wait indefinitely, or0to disable synchronization - UseEnumeratorWithPrefetch: set to true (default) to prefer enumerator instances that prefetch data, speeding up enumeration when using Apache Kafka™ Streams
- UseStorePrefixScan: set to true to enable prefix scan in the engine (default is false) — overridable per entity via
KEFCoreStoreLookupAttributeorHasKEFCoreStoreLookup() - UseStoreSingleKeyLookup: set to true (default) to enable single key look-up in the engine — overridable per entity
- UseStoreKeyRange: set to true (default) to enable key range look-up in the engine — overridable per entity
- UseStoreReverse: set to true (default) to enable reverse look-up in the engine — overridable per entity
- UseStoreReverseKeyRange: set to true (default) to enable reverse key range look-up in the engine — overridable per entity
UseGlobalTable: deprecated — see topic naming and event management conventions for the recommended approachManageEvents: deprecated — event management is now enabled by default for all entity types; useKEFCoreIgnoreEventsAttributeorHasKEFCoreManageEvents(false)to disable it per entity
Topic naming conventions
Topic names are no longer configured via a TopicPrefix option. Instead, KEFCore resolves the topic name for each entity at model finalization time using KEFCoreTopicNamingConvention. See conventions for the full resolution priority.
In short:
- Apply
[KEFCoreTopicAttribute("my-topic")]on the entity class to set the topic name explicitly - Apply
[KEFCoreTopicPrefixAttribute("myprefix")]on the entity class or theDbContextclass to set a prefix - Call
modelBuilder.UseKEFCoreTopicPrefix("myprefix")inOnModelCreatingfor a global prefix - Call
modelBuilder.Entity<T>().ToKEFCoreTopic("my-topic")for a per-entity override
Event management conventions
By default, KEFCore enables event management (i.e. TimestampExtractor activation for real-time change tracking) for all entity types via KEFCoreManageEventsConvention. To opt out:
- Apply
[KEFCoreIgnoreEventsAttribute]on the entity class to disable events for that entity - Call
modelBuilder.Entity<T>().HasKEFCoreManageEvents(false)for a per-entity override - Call
modelBuilder.UseKEFCoreManageEvents(false)to disable events globally
Per-entity topic and store conventions
Several context-level options can be overridden per entity type via conventions, allowing fine-grained control without affecting the global configuration:
- Kafka transactions —
KEFCoreTransactionalAttributeorHasKEFCoreTransactionGroup()to assign entity types to a transaction group, enabling exactly-once semantics for atomic writes across multiple entity types - Producer configuration —
KEFCoreProducerAttributeorHasKEFCoreProducer()to override individual producer settings (Acks,LingerMs,BatchSize,CompressionType,Retries, etc.) per entity, enabling mixed throughput/reliability profiles on the same cluster - Serialization types —
KEFCoreSerDesAttributeorHasKEFCoreSerDes()to overrideKeySerDesSelectorType,ValueSerDesSelectorTypeandValueContainerTypeper entity, enabling mixed serialization formats on the same cluster - Topic partitions and replication factor —
KEFCoreTopicPartitionsAttribute,KEFCoreTopicReplicationFactorAttribute, or fluent APIHasKEFCoreTopicPartitions()/HasKEFCoreTopicReplicationFactor() - Topic retention —
KEFCoreTopicRetentionAttributeorHasKEFCoreTopicRetention()to overrideRetentionBytesandRetentionMsper entity - Read-only —
KEFCoreReadOnlyAttributeorIsKEFCoreReadOnly()to prevent writes for a specific entity type while allowing writes for others in the sameSaveChangescall - Store lookup optimizations —
KEFCoreStoreLookupAttributeorHasKEFCoreStoreLookup()to enable or disable specific query optimization paths per entity
See conventions for full documentation and examples.
How to use KEFCoreDbContext class
The most simple example of usage can be found in KEFCore usage. By default, KEFCoreDbContext automatically manages the OnConfiguring method of DbContext:
KEFCoreDbContextchecks the mandatory options like BootstrapServers and ApplicationIdKEFCoreDbContextsets up the options needed to use an Apache Kafka™ cluster:- default
ProducerConfigcan be overridden using ProducerConfig property - default
StreamsConfigcan be overridden using StreamsConfig property - default
TopicConfigcan be overridden using TopicConfig property
- default
Kafka transactional producers are supported via Database.BeginTransaction() when entity types are assigned to a transaction group via KEFCoreTransactionalAttribute or HasKEFCoreTransactionGroup(). The standard EF Core transaction pattern applies — tx.Commit() or tx.Rollback() maps to CommitTransaction()/AbortTransaction() on the Kafka transactional producer. See conventions for full details.
Secure broker connections
SecurityProtocol, SslConfigs, and SaslConfigs are cluster-level first-wins options that configure authentication and encryption for all internal Kafka clients (producer, admin client, and Streams topology).
optionsBuilder.UseKEFCore(opt => opt
.WithBootstrapServers("KAFKA-SERVER:9093")
.WithApplicationId("MyApp")
.WithSecurityProtocol(SecurityProtocol.SASL_SSL)
.WithSslConfig(new SslConfigsBuilder()
.WithSslTruststoreLocation("/path/to/truststore.jks")
.WithSslTruststorePassword(new Password("truststore-password")))
.WithSaslConfig(new SaslConfigsBuilder()
.WithSaslMechanism("PLAIN")
.WithSaslJaasConfig(new Password(
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"myuser\" password=\"mypassword\";")))
);
See options for the full protocol matrix and additional notes.
Default ProducerConfig
Does not change anything over the Apache Kafka™ defaults
Default StreamsConfig
Does not change anything over the Apache Kafka™ defaults
Default TopicConfig
Over the Apache Kafka™ defaults it applies:
- DeleteRetentionMs set to 100 ms
- MinCleanableDirtyRatio set to 0.01
- SegmentMs set to 100 ms
- RetentionBytes set to 1073741824 bytes (1 Gb)