ProjectoR.Core 0.2.0

dotnet add package ProjectoR.Core --version 0.2.0
NuGet\Install-Package ProjectoR.Core -Version 0.2.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ProjectoR.Core" Version="0.2.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add ProjectoR.Core --version 0.2.0
#r "nuget: ProjectoR.Core, 0.2.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install ProjectoR.Core as a Cake Addin
#addin nuget:?package=ProjectoR.Core&version=0.2.0

// Install ProjectoR.Core as a Cake Tool
#tool nuget:?package=ProjectoR.Core&version=0.2.0

ProjectoR

CI Release NuGet Downloads NuGet

A simple way of writing projections in .NET

Projector

Simple projector example

Example of a simple projector using entity framework core.

A static property named ProjectionName is required with the name of the projection. This will be the name used when saving a checkpoint for the projection.

Methods inside the projector needs to have it's first parameter as the event it wants to react on.

Valid method names

  • Project
  • Consume
  • Consumes
  • Handle
  • Handles
  • When

Valid return types

  • Void
  • Task
  • ValueTask

Dependency injection is supported for project methods and all parameters except for the event and the cancellation token will be injected.

Static project methods are supported.

public class StudentProjector
{
    public static string ProjectionName => "Student";

    public static async Task Project(
        StudentWasEnrolled enrolled, 
        ISampleContext context, 
        CancellationToken cancellationToken)
    {
        context.Students.Add(new StudentProjection
        {
            Id = enrolled.Id,
            FirstName = enrolled.FirstName,
            LastName = enrolled.LastName,
            Address = new Address
            {
                CountryCode = enrolled.CountryCode,
                City = enrolled.City,
                Street = enrolled.Street,
                PostalCode = enrolled.PostalCode
            },
            ContactInformation = new ContactInformation
            {
                Email = enrolled.Email,
                Mobile = enrolled.Mobile
            }
        });

        await context.SaveChangesAsync(cancellationToken);
    }
    
    public static async Task Project(
        StudentRelocated relocated,
        ISampleContext context, 
        CancellationToken cancellationToken) =>
        await context
            .Students
            .Where(user => user.Id == relocated.Id)
            .ExecuteUpdateAsync(calls => calls
                    .SetProperty(projection => projection.Address.City, relocated.NewAddress.City)
                    .SetProperty(projection => projection.Address.PostalCode, relocated.NewAddress.PostalCode)
                    .SetProperty(projection => projection.Address.Street, relocated.NewAddress.Street),
                cancellationToken
            );

    public static async Task Project(
        StudentChangedContactInformation changedContactInformation, 
        ISampleContext context,
        CancellationToken cancellationToken) =>
        await context
            .Students
            .Where(user => user.Id == changedContactInformation.Id)
            .ExecuteUpdateAsync(calls => calls
                    .SetProperty(projection => projection.ContactInformation.Email, changedContactInformation.Email)
                    .SetProperty(projection => projection.ContactInformation.Mobile, changedContactInformation.Mobile),
                cancellationToken
            );

    public static async Task Project(
        StudentGraduated studentGraduated, 
        ISampleContext context, 
        CancellationToken cancellationToken) =>
        await context
            .Students
            .Where(user => user.Id == studentGraduated.Id)
            .ExecuteDeleteAsync(cancellationToken);
}

Pre processing

Projectors have support for pre processing.

Pre processing will run before any project methods run and supports creating dependencies.

In this example it creates a transaction that will be disposed after the projector has finished.

The dependency created can be injected to any project methods.

Valid method names

  • PreProcess
public class StudentProjector
{
    public static string ProjectionName => "Student";

     public static async Task<IDbContextTransaction> PreProcess(
        ISampleContext context,
        CancellationToken cancellationToken) =>
        await context.Database.BeginTransactionAsync(IsolationLevel.ReadUncommitted, cancellationToken);

    public static async Task Project(
        StudentWasEnrolled enrolled,
        IDbContextTransaction transaction,
        ISampleContext context, 
        CancellationToken cancellationToken)
    {
        context.Students.Add(new StudentProjection
        {
            Id = enrolled.Id,
            FirstName = enrolled.FirstName,
            LastName = enrolled.LastName,
            Address = new Address
            {
                CountryCode = enrolled.CountryCode,
                City = enrolled.City,
                Street = enrolled.Street,
                PostalCode = enrolled.PostalCode
            },
            ContactInformation = new ContactInformation
            {
                Email = enrolled.Email,
                Mobile = enrolled.Mobile
            }
        });

        await context.SaveChangesAsync(cancellationToken);
        await transaction.CommitAsync(cancellationToken);
    }
}

Post processing

Projectors have support for post processing.

Post processing will run after all project methods have.

In this example a transaction is created in the pre process stage and then later in the post processing stage is commited. When a projector is processing events in batches this would be a stratgy for updating the projection with one database command for multiple events.

Valid method names

  • PostProcess
public class StudentProjector
{
    public static string ProjectionName => "Student";

    public static async Task<IDbContextTransaction> PreProcess(
        ISampleContext context,
        CancellationToken cancellationToken) =>
        await context.Database.BeginTransactionAsync(IsolationLevel.ReadUncommitted, cancellationToken);

    public static async Task PostProcess(
        IDbContextTransaction transaction,
        CancellationToken cancellationToken) =>
        await transaction.CommitAsync(cancellationToken);
    
}

Batching

When registering a projector batching can be configured.

If setting batch size to 100 it will process 100 events at a time. Pre and post processing will run once for 100 events.

Batch timeout is the time it will wait before forcing a batch. If only 2 event has appeard for 500ms the projector will process 2 events instead of 100.

Batch size is by default set too 100 and batch timeout is set to 500ms.

builder
    .Services
    .AddProjectoR(configurator =>
    {
        configurator
            .UseEventStore(
                builder.Configuration.GetConnectionString("EventStoreDB"),
                eventStoreConfigurator =>
                {
                    eventStoreConfigurator
                        .UseSubscription<StudentProjector>(configure =>
                        {
                            configure.BatchingOptions.BatchSize = 100;
                            configure.BatchingOptions.BatchTimeout = TimeSpan.FromMilliseconds(100);
                        });
                }
            );
    });

Concurrency

When configuring ProjectoR, you have the option to specify the concurrency settings for running projectors. By default, this is set to the number of registered projectors.

builder
    .Services
    .AddProjectoR(configurator =>
    {
        configurator.MaxConcurrency = 1;
    });

Prioritization

When registering a projector, you have the option to assign a priority to it. If a higher priority is set, the projector will be updated before other projectors with lower priorities. However, when the concurrency is set to a value greater than 1, there is a significant chance that projectors will run concurrently, potentially reducing the impact of prioritization.

Priority options

  • Low
  • Normal
  • Highest
builder
    .Services
    .AddProjectoR(configurator =>
    {
        configurator
            .UseEventStore(
                builder.Configuration.GetConnectionString("EventStoreDB"),
                eventStoreConfigurator =>
                {
                    eventStoreConfigurator
                        .UseSubscription<StudentProjector>(configure =>
                        {
                            configure.Priority = ProjectorPriority.Highest;
                        });
                }
            );
    });

Checkpointing

Stratergies

There are 3 different checkpointing stratergies supported.

  • EveryEvent
  • Interval
  • AfterBatch

The default stratergy is set to save a checkpoint after every event but can be changed when registering a projector.

builder
    .Services
    .AddProjectoR(configurator =>
    {
        configurator
            .UseEventStore(
                builder.Configuration.GetConnectionString("EventStoreDB"),
                eventStoreConfigurator =>
                {
                    eventStoreConfigurator
                        .UseSubscription<StudentProjector>(configure =>
                        {
                            configure.CheckpointingOptions.CheckpointAfterBatch();
                        });
                }
            );
    });

Storage

There are multiple different storages supported for saving checkpoints.

  • EventStoreDB
  • EntityFrameworkCore
  • InMemory

Check EventStoreDB and EntityFrameworkCore section for how to use them as storage for checkpoints.

Example for using inmemory checkpoint should probably only be used for testing.

builder
    .Services
    .AddProjectoR(configurator =>
    {
         configurator.UseInMemoryCheckpointing();
    });

Event type resolver

A event type resolver is needed to convert events read from the database to a c# class. The name of the event is stored in the used event store or streaming provider and needs to be mapped to a c# class. There are heaps of stratergies on how to name your event when storing them depending on versioning and much more.

There are some built in but in most case a custom one should be used.

These are the supported event type resolvers.

  • Namespace
  • ClassName
  • Custom

Example of a custom event type resolver that loads all event for a specific namespace. This event type resolver uses class name.

public class CustomTypeResolver : IEventTypeResolver
{
    private readonly IReadOnlyDictionary<string, Type> _eventTypes;

    public CustomTypeResolver()
    {
        _eventTypes = typeof(StudentGraduated)
            .Assembly
            .GetTypes()
            .Where(type => type.Namespace == typeof(StudentGraduated).Namespace)
            .ToDictionary(GetName, type => type);
    }

    public Type GetType(string eventName) =>
        _eventTypes.TryGetValue(eventName, out var type)
            ? type
            : throw new InvalidOperationException($"Type for event with name: {eventName} was not found");

    public string GetName(Type eventType) => eventType.Name;
}

builder
    .Services
    .AddProjectoR(configurator =>
    {
        configurator
            .UseEventStore(
                builder.Configuration.GetConnectionString("EventStoreDB"),
                eventStoreConfigurator =>
                {
                    eventStoreConfigurator
                        .UseSubscription<StudentProjector>(configure =>
                        {
                            configure.SerializationOptions.UseCustomEventTypeResolver<CustomTypeResolver>();
                        });
                }
            );
    });

EventStoreDB

To use ProjectoR with EventStoreDB You should install ProjectoR.EvenStore:

EntityFrameworkCore

CustomSubscription

ProjectoR supports custom subscriptions this way if there is an event store or event streaming provider that is not supported a custom subscription can be written to integrate with it.

A custom subscription needs to return a IAsyncEnumerable or IEnumerable of EventData.

The parameters provided

  • serviceProvider so a scope can be created and needed services can be injected.
  • eventNames a list of the events that the projector this subscription is used for is consuming, this is need for filtering out events.
  • checkPoint the last checkpoint stored for the projector.
  • cancellationToken

Valid method names

  • Subscribe

Example for a sql subscription that is using polling to the check against an table of events.

public class EntityFrameworkCoreSubscription
{
    private const int BatchSize = 100;
    private static readonly TimeSpan PollingInterval = TimeSpan.FromSeconds(1);
    
    public static async IAsyncEnumerable<EventData> Subscribe(
        IServiceProvider serviceProvider,
        string[] eventNames,
        long? checkpoint,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        using var timer = new PeriodicTimer(PollingInterval);
        var moreEventsExists = false;
        var position = checkpoint ?? 0;
        
        while (moreEventsExists || await timer.WaitForNextTickAsync(cancellationToken))
        {
            await using var scope = serviceProvider.CreateAsyncScope();
            var eventContext = scope.ServiceProvider.GetRequiredService<ApplicationContext>();
            
            var lastEventPosition = position;
            var lastReadPosition = position;
            var events = eventContext
                .Events
                .Where(@event => eventNames.Contains(@event.EventName))
                .Where(@event => @event.Position > lastReadPosition)
                .Take(BatchSize)
                .Select(@event => new EventData(@event.EventName, @event.Data, @event.Position));
            
            await foreach (var @event in events.AsAsyncEnumerable().WithCancellation(cancellationToken))
            {
                yield return @event;
                lastEventPosition = @event.Position;
                position = @event.Position;
            }
            
            moreEventsExists = await eventContext
                .Events
                .Where(@event => eventNames.Contains(@event.EventName))
                .Where(@event => @event.Position > lastEventPosition)
                .AnyAsync(cancellationToken: cancellationToken);
        }
    }
}

Registering a projector that uses a custom subscription


builder
    .Services
    .AddProjectoR(configurator =>
    {
         configurator
            .UseCustomSubscription<EntityFrameworkCoreSubscription, StudentProjector>(configure =>
            {
                configure.BatchingOptions.BatchSize = 100;
                configure.BatchingOptions.BatchTimeout = TimeSpan.FromMilliseconds(100);
                configure.CheckpointingOptions.CheckpointAfterBatch();
            })
    });

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on ProjectoR.Core:

Package Downloads
ProjectoR.EntityFrameworkCore

ProjectoR.EntityFrameworkCore simplifies the integration of Entity Framework Core by providing essential support for utilizing it as a storage solution for saving checkpoints. This NuGet package empowers developers to seamlessly incorporate Entity Framework Core into their applications, streamlining the process of managing checkpoints.

ProjectoR.EventStore

ProjectoR.EventStore enhances the integration of EventStoreDB with Projector, offering seamless support for event subscription and checkpoint management. This NuGet package facilitates a streamlined experience for developers, allowing efficient handling of events and the storage of checkpoints directly in EventStoreDBcheckpoints in EventStoreDB.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.2.0 125 3/11/2024
0.2.0-alpha.7 44 3/10/2024
0.2.0-alpha.2 45 3/10/2024
0.2.0-alpha.1 51 3/10/2024
0.1.6-beta.2 50 3/11/2024
0.1.5 109 3/10/2024