Table of Contents

Class KEFCoreDbContext

Namespace
MASES.EntityFrameworkCore.KNet.Infrastructure
Assembly
MASES.EntityFrameworkCore.KNet.dll

A KEFCoreDbContext instance represents a session with the Apache Kafka™ cluster and can be used to query and save instances of your entities. KEFCoreDbContext extends DbContext and it is a combination of the Unit Of Work and Repository patterns.

public class KEFCoreDbContext : DbContext, IInfrastructure<IServiceProvider>, IDbContextDependencies, IDbSetCache, IDbContextPoolable, IResettableService, IDisposable, IAsyncDisposable
Inheritance
KEFCoreDbContext
Implements
Derived
Inherited Members

Remarks

Entity Framework Core does not support multiple parallel operations being run on the same KEFCoreDbContext instance. This includes both parallel execution of async queries and any explicit concurrent use from multiple threads. Therefore, always await async calls immediately, or use separate DbContext instances for operations that execute in parallel. See Avoiding DbContext threading issues for more information and examples.

Typically you create a class that derives from DbContext and contains DbSet<TEntity> properties for each entity in the model. If the DbSet<TEntity> properties have a public setter, they are automatically initialized when the instance of the derived context is created.

Typically you don't need to override the OnConfiguring(DbContextOptionsBuilder) method to configure the database (and other options) to be used for the context, just set the properties associated to KEFCoreDbContext class. Alternatively, if you would rather perform configuration externally instead of inline in your context, you can use DbContextOptionsBuilder<TContext> (or DbContextOptionsBuilder) to externally create an instance of DbContextOptions<TContext> (or DbContextOptions) and pass it to a base constructor of DbContext.

The model is discovered by running a set of conventions over the entity classes found in the DbSet<TEntity> properties on the derived context. To further configure the model that is discovered by convention, you can override the OnModelCreating(ModelBuilder) method.

See KEFCoreDbContext configuration and initialization, DbContext lifetime, configuration, and initialization, Querying data with EF Core, Changing tracking, and Saving data with EF Core for more information and examples.

Constructors

KEFCoreDbContext()

Initializes a new instance of the DbContext class. The OnConfiguring(DbContextOptionsBuilder) method will be called to configure the database (and other options) to be used for this context.

public KEFCoreDbContext()

Remarks

See DbContext lifetime, configuration, and initialization for more information and examples.

KEFCoreDbContext(DbContextOptions)

Initializes a new instance of the DbContext class using the specified options. The OnConfiguring(DbContextOptionsBuilder) method will still be called to allow further configuration of the options.

public KEFCoreDbContext(DbContextOptions options)

Parameters

options DbContextOptions

The options for this context.

Remarks

Properties

ApplicationId

The application id associated to the application in use, shall be set if UseCompactedReplicator is false

public virtual string? ApplicationId { get; set; }

Property Value

string

Remarks

Choose the value carefully: upon restart its value is used to identify information on Apache Kafka™ cluster of previous running application. If UseGlobalTable is false the partitions associated to each topic are shared across all instances with the same ApplicationId so be carefull to avoid other application consumes the data from Apache Kafka™ cluster and local stores does not contains the expected information.

BootstrapServers

The bootstrap servers of the Apache Kafka™ cluster

public virtual string? BootstrapServers { get; set; }

Property Value

string

ConsumerConfig

[Obsolete("Option will be removed soon")]
public virtual ConsumerConfigBuilder? ConsumerConfig { get; set; }

Property Value

ConsumerConfigBuilder

DefaultConsumerConfig

The default ConsumerConfig configuration

[Obsolete("Option will be removed soon")]
public static ConsumerConfigBuilder DefaultConsumerConfig { get; }

Property Value

ConsumerConfigBuilder

The default ConsumerConfig configuration.

DefaultConsumerInstances

Default consumr instances used in conjunction with UseCompactedReplicator

public virtual int? DefaultConsumerInstances { get; set; }

Property Value

int?

DefaultNumPartitions

Default number of partitions associated to each topic

public virtual int DefaultNumPartitions { get; set; }

Property Value

int

DefaultProducerConfig

The default ProducerConfig configuration

public static ProducerConfigBuilder DefaultProducerConfig { get; }

Property Value

ProducerConfigBuilder

The default ProducerConfig configuration.

DefaultReplicationFactor

Default replication factor associated to each topic

public virtual short DefaultReplicationFactor { get; set; }

Property Value

short

DefaultStreamsConfig

The default StreamsConfig configuration

public static StreamsConfigBuilder DefaultStreamsConfig { get; }

Property Value

StreamsConfigBuilder

The default StreamsConfig configuration.

DefaultSynchronizationTimeout

The default timeout, expressed in milliseconds, KEFCore will wait for backend to be in-sync with Apache Kafka™ cluster. Setting DefaultSynchronizationTimeout to 0 the synchronization will be disabled

public virtual long DefaultSynchronizationTimeout { get; set; }

Property Value

long

Remarks

The KEFCore provider try to synchronize with Apache Kafka™ cluster waiting at least DefaultSynchronizationTimeout when EnsureCreated() of Database is invoked.

DefaultTopicConfig

The default TopicConfig configuration

public static TopicConfigBuilder DefaultTopicConfig { get; }

Property Value

TopicConfigBuilder

The default TopicConfig configuration.

KeySerDesSelectorType

The optional Type to use for key serialization selection

Default value is DefaultKEFCoreSerDes.Key<T>, any custom Type shall implement ISerDesSelector<T>

public virtual Type? KeySerDesSelectorType { get; set; }

Property Value

Type

ManageEvents

Setting this property to true the engine will emit events on ChangeTracker, default is true

public virtual bool ManageEvents { get; set; }

Property Value

bool

ProducerConfig

The optional ProducerConfigBuilder

public virtual ProducerConfigBuilder? ProducerConfig { get; set; }

Property Value

ProducerConfigBuilder

ReadOnlyMode

Setting this property to true if the engine shall reject any write operation, its value will be used to verify if topics has the proper rights WRITE and READ

public virtual bool ReadOnlyMode { get; set; }

Property Value

bool

StreamsConfig

The optional StreamsConfig used when UseCompactedReplicator is false

public virtual StreamsConfigBuilder? StreamsConfig { get; set; }

Property Value

StreamsConfigBuilder

TopicConfig

The optional TopicConfigBuilder used when topics shall be created

public virtual TopicConfigBuilder? TopicConfig { get; set; }

Property Value

TopicConfigBuilder

TopicPrefix

Database name means whe prefix of the topics associated to the instance of KEFCoreDbContext

public virtual string? TopicPrefix { get; set; }

Property Value

string

UseByteBufferDataTransfer

Setting this property to true the engine prefers to use ByteBuffer data exchange in serializer instances

public virtual bool UseByteBufferDataTransfer { get; set; }

Property Value

bool

UseCompactedReplicator

Use KNetCompactedReplicator<K, V> instead of Apache Kafka™ Streams

[Obsolete("Option will be removed soon")]
public virtual bool UseCompactedReplicator { get; set; }

Property Value

bool

UseDeletePolicyForTopic

Use delete cleanup policy when a topic is created

public bool UseDeletePolicyForTopic { get; set; }

Property Value

bool

UseEnumeratorWithPrefetch

Setting this property to true the engine prefers to use enumerator instances able to do a prefetch on data speeding up execution

public virtual bool UseEnumeratorWithPrefetch { get; set; }

Property Value

bool

Remarks

Used only if UseCompactedReplicator is false and UseKNetStreams is true, not available in EFCore 6.

UseGlobalTable

Setting this property to true the engine based on Streams (i.e. UseCompactedReplicator is false) will use GlobalKTable<K, V> (GlobalKTable<K, V, TJVMK, TJVMV> if UseKNetStreams is true) instead of KTable<K, V> (KTable<K, V, TJVMK, TJVMV> if UseKNetStreams is true) to manage local storage of information.

public virtual bool UseGlobalTable { get; set; }

Property Value

bool

Remarks

Setting UseGlobalTable to true is in contrast with ManageEvents that needs UseCompactedReplicator=true or a backend based on KTable<K, V> (KTable<K, V, TJVMK, TJVMV> if UseKNetStreams is true) The behavior can be changed if the application based on KEFCore does not needs events (ManageEvents=true) and/or there is a limitation using KTable<K, V> (KTable<K, V, TJVMK, TJVMV> if UseKNetStreams is true) coming from other configuration parameters like ApplicationId: using the same ApplicationId across the same cluster, the partitions of KTable<K, V> (KTable<K, V, TJVMK, TJVMV> if UseKNetStreams is true) are managed from multiple instances.

UseKNetStreams

Use KNet version of Apache Kafka™ Streams instead of standard Apache Kafka™ Streams

public virtual bool UseKNetStreams { get; set; }

Property Value

bool

UseNameMatching

Set to false to avoid match of IEntityTypes using Name

public virtual bool UseNameMatching { get; set; }

Property Value

bool

UsePersistentStorage

Use persistent storage when Apache Kafka™ Streams is in use

public virtual bool UsePersistentStorage { get; set; }

Property Value

bool

UseStoreKeyRange

Setting this property to true to enable key range look-up in engine

public virtual bool UseStoreKeyRange { get; set; }

Property Value

bool

UseStorePrefixScan

Setting this property to true to enable prefix scan in engine

public virtual bool UseStorePrefixScan { get; set; }

Property Value

bool

UseStoreReverse

Setting this property to true to enable reverse look-up in engine

public virtual bool UseStoreReverse { get; set; }

Property Value

bool

UseStoreReverseKeyRange

Setting this property to true to enable reverse key range look-up in engine

public virtual bool UseStoreReverseKeyRange { get; set; }

Property Value

bool

UseStoreSingleKeyLookup

Setting this property to true to enable single key look-up in engine

public virtual bool UseStoreSingleKeyLookup { get; set; }

Property Value

bool

ValueContainerType

The optional Type to use as value container

Default value is DefaultValueContainer<TKey>, any custom Type shall implement IValueContainer<T>

public virtual Type? ValueContainerType { get; set; }

Property Value

Type

ValueSerDesSelectorType

The optional Type to use for value serialization selection

Default value is DefaultKEFCoreSerDes.ValueContainer<T>, any custom Type shall implement ISerDesSelector<T>

public virtual Type? ValueSerDesSelectorType { get; set; }

Property Value

Type

Methods

OnConfiguring(DbContextOptionsBuilder)

Override this method to configure the database (and other options) to be used for this context. This method is called for each instance of the context that is created. The base implementation does nothing.

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

Parameters

optionsBuilder DbContextOptionsBuilder

A builder used to create or modify options for this context. Databases (and other extensions) typically define extension methods on this object that allow you to configure the context.

Remarks

In situations where an instance of DbContextOptions may or may not have been passed to the constructor, you can use IsConfigured to determine if the options have already been set, and skip some or all of the logic in OnConfiguring(DbContextOptionsBuilder).

See DbContext lifetime, configuration, and initialization for more information and examples.

RegisterComplexTypeConverter(IComplexTypeConverter)

public void RegisterComplexTypeConverter(IComplexTypeConverter converter)

Parameters

converter IComplexTypeConverter

The IComplexTypeConverter with all Type it manages (see SupportedClrTypes)

RegisterComplexTypeConverter(Assembly)

Allocate and register all Type in an Assembly managing an instance of IComplexTypeConverter

public void RegisterComplexTypeConverter(Assembly assembly)

Parameters

assembly Assembly

RegisterComplexTypeConverter(Type)

Allocate and register a Type managing an instance of IComplexTypeConverter

public void RegisterComplexTypeConverter(Type type)

Parameters

type Type

ResetStreams()

Resets the Apache Kafka streams application in use

public void ResetStreams()

Remarks

Use this method with cautions

WaitForSynchronization(long)

Invoke the method to wait a timeout defined from waitTimeMs for synchonization with Apache Kafka™ backend

public bool? WaitForSynchronization(long waitTimeMs = -1)

Parameters

waitTimeMs long

The time expressed as milliseconds to wait for synchonization with Apache Kafka™ backend

Returns

bool?

An optional bool, null means an uncertain result (e.g. UseGlobalTable is true), true if the store is in-sync, false otherwise

Exceptions

TimeoutException

Raised if the waitTimeMs has expired without receive an information

WaitForSynchronization(TimeSpan)

Invoke the method to wait a timeout defined from waitTime for synchonization with Apache Kafka™ backend

public bool? WaitForSynchronization(TimeSpan waitTime)

Parameters

waitTime TimeSpan

The time expressed as TimeSpan to wait for synchonization with Apache Kafka™ backend

Returns

bool?

An optional bool, null means an uncertain result (e.g. UseGlobalTable is true), true if the store is in-sync, false otherwise

Exceptions

TimeoutException

Raised if the waitTime has expired without receive an information