AsyncAssortments 0.5.0-alpha

This is a prerelease version of AsyncAssortments.
dotnet add package AsyncAssortments --version 0.5.0-alpha                
NuGet\Install-Package AsyncAssortments -Version 0.5.0-alpha                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="AsyncAssortments" Version="0.5.0-alpha" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add AsyncAssortments --version 0.5.0-alpha                
#r "nuget: AsyncAssortments, 0.5.0-alpha"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install AsyncAssortments as a Cake Addin
#addin nuget:?package=AsyncAssortments&version=0.5.0-alpha&prerelease

// Install AsyncAssortments as a Cake Tool
#tool nuget:?package=AsyncAssortments&version=0.5.0-alpha&prerelease                

Async Assortments

  1. Overview
  2. Download
  3. Project goals
  4. Why not use System.Reactive.Linq ?
  5. What is an IAsyncEnumerable anyway?
  6. Concurrent and parallel sequences
  7. Async sequences and reactive programming

Overview

C# 8 added the IAsyncEnumerable interface to support async sequences, but .NET is still missing the infrastructure to make it useful in many situations. This library aims to bridge the gap, providing support for await, LINQ, concurrent programming, and more to IAsyncEnumerable. Keep reading for details!

var ids = new int[] { 1, 2, 3 /* etc */ };

// Create an IAsyncEnumerable from any IEnumerable
var seq = ids.ToAsyncEnumerable();

// Manipulate the sequence using LINQ (and async LINQ!)
var processedSeq = ids
    .ToAsyncEnumerable()
    .Where(x => x > 2)
    .AsyncSelect(async x => await ProcessItemAsync(x))
    .SelectMany(x => new[] { x, x });

// Asynchronously process each item as it comes in
await foreach (var item in processedSeq) {
    // ...
}

// Or directly await the whole IAsyncEnumerable to get a normal IEnumerable, just
// like awaiting a Task<T> gives you a T
var results = await processedSeq;

// Or collect the IAsyncEnumerable into a more specific type
var results2 = await processedSeq.ToDictionaryAsync(x => x, x => x);

// Create an async method with multiple return values using iterators
async IAsyncEnumerable<string> GetSequenceAsync() {
    yield return "Hello world!";
    await Task.Delay(1000);
    yield return "This is an async function that returns a sequence!";
}

// Process a list in parallel using async LINQ
// Normally you would use some ghastly combination of LINQ and Task.WhenAll for this
var results = await ids
    .ToAsyncEnumerable()
    .AsParallel(preserveOrder: false)
    .AsyncSelect(async x => await Step1Async(x))
    .Where(x => x.Something >= 100)
    .AsyncSelect(async x => await Step2Async(x))
    .Take(50);

Download

The latest version of AsyncAssortments can be downloaded on NuGet here.

Project goals

Microsoft has made it clear that they have no intentions of shipping LINQ extensions for IAsyncEnumerable and they don't think all useful functionality must be included in the standard library. Well, challenge accepted! This project aims to provide high-quality infrastructure for IAsyncEnumerable, including LINQ.

Why not use System.Reactive.Linq ?

The incredible Rx.NET project maintains the System.Linq.Async namespace, which contains LINQ operators for IAsyncEnumerable. Why create a new project that aims to do the same thing?

Rx.NET assumes an IAsyncEnumerable represents a linear sequence of elements with a delay between each element. This is fine (but not ideal) for IO-bound operations like a database query, but in C# tasks can also be used to represent concurrent operations running at the same time. What if we wanted to use IAsyncEnumerable to represent a collection of tasks that have already started?

The LINQ operators included in Rx.NET don't compose correctly under concurrent operations, so a new library was needed. See the section below for details.

What is an IAsyncEnumerable anyway?

In C#, IEnumerable represents a lazy sequence, which means that the next element is not computed until MoveNext() is called on the iterator. If you stop calling MoveNext(), no more elements are produced. This means it is a pull-based sequence, because IEnumerable requires the consumer to "pull" items out of it.

By contrast, a Task is eager, and it starts as soon as it's created. If you call a function that returns a task without awaiting, that function is running concurrently in the background. When the task is done, it "pushes" the result to consumers by calling the task's callbacks. This means it is push-based, and runs whether or not it is awaited by the consumer.

This puts IAsyncEnumerable in a very weird spot. Should it be lazy like an IEnumerable, or eager like a Task?

The best answer is that IAsyncEnumerable should support both lazy and eager modes, because it has to interact with both types of systems. By default, an IAsyncEnumerable is lazy like a normal IEnumerable, and it only runs operators when MoveNextAsync() is called on the iterator. Consider this code:

// Takes 600 ms
var results = await new[] { 3, 2, 1 }
    .ToAsyncEnumerable()
    .AsyncSelect(async x => { await Task.Delay(x); return x; })
    .ToListAsync();

This does exactly what you would expect from a lazy sequence, because it calls the selector as it iterates the items. In effect, this will do MoveNext(), Delay(300), MoveNext(), Delay(200), MoveNext(), Delay(100), taking 600 ms in total.

But what if we want this to be eager and behave more like a list of tasks? This is usually what you want Task.WhenAll for.

// Takes 300 ms
var results = await new[] { 3, 2, 1 }
    .ToAsyncEnumerable()
    .AsConcurrent()
    .AsyncSelect(async x => { await Task.Delay(x); return x; })
    .ToListAsync();

By adding .AsConcurrent() we can tell the operators to process the elements at the same time once the sequence starts enumerating. This is now an eager sequence: when we start to enumerate it with await .ToListAsync(), all of the selectors from AsyncSelect will start at the same time instead of waiting for the previous ones to finish.

This works more like calling await Task.WhenAll() with a list of tasks. First we start all of the work, and then await the results afterwards. This will do Delay(300) + Delay(200) + Delay(100), MoveNext(), MoveNext(), MoveNext(), taking only 300 ms total.

Concurrent and parallel sequences

We just saw how AsConcurrent() told later operators to behave eagerly like a list of tasks, but there are actually a few different ways of doing this that each have their own behavior.

First there is AsConcurrent(), which runs async operators at the same time using tasks, much like you would when using Task.WhenAll(). However this still uses just one thread, and if you do CPU-bound work the sequence will jam up and not run concurrently anymore.

For this situate there is also AsParallel(), which schedules async operators to run on the thread pool instead. Think of this like Task.Run-ing all the async operators:

// Takes 600 ms
var results = await new[] { 3, 2, 1 }
    .ToAsyncEnumerable()
    .AsConcurrent()
    .AsyncSelect(async x => { Thread.Sleep(x); return x; })
    .ToListAsync();

// Takes 300 ms
var results = await new[] { 3, 2, 1 }
    .ToAsyncEnumerable()
    .AsParallel()
    .AsyncSelect(async x => { Thread.Sleep(x); return x; })
    .ToListAsync();

Finally there is the matter of ordering. By default, AsConcurrent() and AsParallel() preserve the order of the sequence when running their operators, which is usually what you want. However, there are situations where you want finished operations to immediately continue down the pipeline without waiting on previous items. This will improve the time until the first result, and it can massively improve performance if there is a lot of time variability at each step.

// Takes 100 ms
var results = await new[] { 3, 2, 1 }
    .ToAsyncEnumerable()
    .AsConcurrent(preserveOrder: false)
    .AsyncSelect(async x => { await Task.Delay(x); return x; })
    .FirstAsync();

// Returns 1, 2, 3
var results = await new[] { 3, 2, 1 }
    .ToAsyncEnumerable()
    .AsConcurrent(preserveOrder: false)
    .AsyncSelect(async x => { await Task.Delay(x); return x; })
    .ToListAsync();

Async sequences and reactive programming

When an IAsyncEnumerable is operating in eager mode with AsConcurrent() or AsParallel(), it works more-or-less like an IObservable from Rx.NET. If you squint, creating an async enumerator is basically the same as subscribing to an observable.

That means it's easy to use async sequences with reactive programming, and you can convert observables to async enumerables and back:

var subject = new Subject<int>();

// This will subscribe to our subject and print 10 20 30 to the console
var task = Enumerate();

subject.OnNext(10);
subject.OnNext(20);
subject.OnNext(30);
subject.OnCompleted();

// This task will finish when our observable is complete
await task;

Console.WriteLine("Done!");

async Task Enumerate() {
    await foreach (var item in subject.ToAsyncEnumerable()) {
        Console.WriteLine(item);
    }
}
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.5.0-alpha 40 12/27/2024
0.4.0-alpha 50 11/27/2024