Reactive.AzureStorage.Table 1.0.0

dotnet add package Reactive.AzureStorage.Table --version 1.0.0
NuGet\Install-Package Reactive.AzureStorage.Table -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Reactive.AzureStorage.Table" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Reactive.AzureStorage.Table --version 1.0.0
#r "nuget: Reactive.AzureStorage.Table, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Reactive.AzureStorage.Table as a Cake Addin
#addin nuget:?package=Reactive.AzureStorage.Table&version=1.0.0

// Install Reactive.AzureStorage.Table as a Cake Tool
#tool nuget:?package=Reactive.AzureStorage.Table&version=1.0.0

Reactive.AzurePackages

    A reactive interface for Azure storage SDK - a natural programming model for asynchronous data streams.

Features

Azure Storage

Table
    While working with azure storage, its throughput has to be taken into consideration and APIs need to be designed to enable and encourage better utilization of threads. The current throughput limitation on azure table storage is upto 2000 entities per second for single table partition with 1KiB entities - [AzureStorageScalabilityandPerformanceTargets] (https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets). This will further be affected by the partitioning node server distribution, network bandwidth, message size and the table query.

    All these factors results into non deterministic nature of the table operation and the timeline. Azure provides an indicator/tracking pointer on the table operation as continuous-token along with intermediate results. As long as it is having a  valid pointer (from where next operation - say a 'read' begins), the operation is not yet completed. With this approach, the operation results will be provided as packets/chunks over a period of time. As the each chunk requires a network round trip and should move linearly on the line connected by the token pointers, processing a chunk soon after it arrives improves performance and uses CPU cores optimally. Providing table-operation-result chunk as a reactive stream will build the scaffolding structure for smooth flow of the processing pipeline. 

Example

public async Task BulkReadTest()
{
    var communicator = new Communicator(_accountName, _accountKey);
    var tableQuery = new TableQuery<CustomerEntity>()
                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThan, "Partition"));

    var results = await communicator.ReadAsync("USA", tableQuery) // Process the table operation continuously on a separate thread
                                .SelectMany(e => GetCustomer(e)) // Process the intermediate results on a separate thread soon after it becomes available
                                .ToArray();
}

private async Task<Customer> GetCustomer(CustomerEntity e)
{
    await Task.Delay(5); // simulated delay
    return new Customer { CustomerId = e.CustomerId, FirstName = e.FirstName, LastName = e.LastName };
}

The reactive stream can further be extended all the way up, like

public IObservable<Customer> GetGoldCustomers()
{
    var communicator = new Communicator(_accountName, _accountKey);
    var tableQuery = new TableQuery<CustomerEntity>()
                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThan, "Partition"));

    return communicator.ReadAsync("USA", tableQuery)
                        .SelectMany(e => GetCustomer(e))
                        .SelectMany(c => IsValidCustomerService(c))
                        .Where(tple => tple.Item2 == true)
                        .SelectMany(tple => IsGold(tple.Item1))
                        .Where(tple => tple.Item2 == true)
                        .Select(tple => tple.Item1);
}

private async Task<(Customer,bool)> IsValidCustomerService(Customer c) => await Task.Delay(5).ContinueWith(_ => (c,true));

private async Task<(Customer, bool)> IsGold(Customer c) => await Task.Delay(5).ContinueWith(_ => (c, true));

This stream based programming model helps join query based deletion of entities like

public async Task Delete()
{
    var communicator = new Communicator(_accountName, _accountKey);
    var tableQuery = new TableQuery<CustomerEntity>()
                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThan, "DateRange"));

    var results = await communicator.ReadAsync("USA", tableQuery)
                                    .Buffer(100)
                                    .SelectMany(ets => communicator.BatchDeleteAsync("Table", ets.ToArray()))
                                    .ToArray();
}

Important Points

As this is considered as the tight skin on top of the WindowsAzure.Storage sdk, it doesn't take additional responsibilities on input data. It is a pass through layer and provides reactive api to the caller. Additional customized layers can be added as decorators to this.

  • Validation of parameters like null or empty table name are delegated to sdk
  • Validation of common partition key and batch size of 100 is delegated to sdk.
  • Utilities are provided to serialize and deserialzie business entities to/from table entities using TableEntity class. Test project contains examples of its usage
  • Using DataFlow blocks with reactive streams results in more modular models

License

This software is open source
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.0 25,360 5/27/2018