SharpPulsar 2.9.0

.NET 5.0 .NET Standard 2.1
There is a newer prerelease version of this package available.
See the version list below for details.
Install-Package SharpPulsar -Version 2.9.0
dotnet add package SharpPulsar --version 2.9.0
<PackageReference Include="SharpPulsar" Version="2.9.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add SharpPulsar --version 2.9.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: SharpPulsar, 2.9.0"
#r directive can be used in F# Interactive, C# scripting and .NET Interactive. Copy this into the interactive tool or source code of the script to reference the package.
// Install SharpPulsar as a Cake Addin
#addin nuget:?package=SharpPulsar&version=2.9.0

// Install SharpPulsar as a Cake Tool
#tool nuget:?package=SharpPulsar&version=2.9.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

Build Tests

SharpPulsar

SharpPulsar is an Apache Pulsar Client built on top Akka.net, which can handle millions of Apache Pulsar Producers/Consumers (in theory).

What Is Akka.NET?

Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET & Mono that is able to support up to 50 million msg/sec on a single machine, with small memory footprint and ~2.5 million actors(or Apache Pulsar Producers/Consumers) per GB of heap.

What Is Apache Pulsar?

Apache Pulsar is a cloud-native, distributed messaging and streaming platform that is able to support millions of topics while delivering high-throughput and low-latency performance.

Supported features

Client

  • TLS
  • Authentication (token, tls, OAuth2)
  • Multi-Hosts Service URL
  • Proxy
  • SNI Routing
  • Transactions
  • Subscription(Durable, Non-durable)

Producer

  • Exclusive Producer
  • Partitioned Topics
  • Batching
  • Compression (LZ4, ZLIB, ZSTD, SNAPPY)
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • User-defined properties
  • Key-based batcher
  • Delayed/Scheduled messages
  • Interceptors
  • Message Router (RoundRobin, ConsistentHashing, Broadcast, Random)
  • End-to-end Encryption
  • Chunking
  • Transactions

Consumer

  • User-defined properties
  • HasMessageAvailable
  • Subscription Type (Exclusive, Failover, Shared, Key_Shared)
  • Subscription Mode (Durable, Non-durable)
  • Interceptors
  • Ack (Ack Individual, Ack Commulative, Batch-Index Ack)
  • Ack Timeout
  • Negative Ack
  • Dead Letter Policy
  • End-to-end Encryption
  • SubscriptionInitialPosition
  • Partitioned Topics
  • Batching
  • Compression (LZ4, ZLIB, ZSTD, SNAPPY)
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • Compacted Topics
  • Multiple Topics
  • Regex Consumer
  • Broker Entry Metadata

Reader

  • User-defined properties
  • HasMessageAvailable
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • Seek (MessageID, Timestamp)
  • Multiple Topics
  • End-to-end Encryption
  • Interceptors

Extras

  • Pulsar SQL
  • Pulsar Admin REST API
  • Function REST API
  • EventSource(Reader/SQL)

Getting Started

Install the NuGet package SharpPulsar and follow the Tutorials.

//pulsar client settings builder
            var clientConfig = new PulsarClientConfigBuilder()
                .ServiceUrl("pulsar://localhost:6650");

            //pulsar actor system
            var pulsarSystem = PulsarSystem.GetInstance(clientConfig);

            var pulsarClient = pulsarSystem.NewClient();

            var consumer = pulsarClient.NewConsumer(new ConsumerConfigBuilder<sbyte[]>()
                .Topic(myTopic)
                .ForceTopicCreation(true)
                .SubscriptionName("myTopic-sub"));

            var producer = pulsarClient.NewProducer(new ProducerConfigBuilder<sbyte[]>()
                .Topic(myTopic));

            for (var i = 0; i < 10; i++)
            {
                var data = Encoding.UTF8.GetBytes($"tuts-{i}").ToSBytes();
                producer.NewMessage().Value(data).Send();
            }
			Thread.Sleep(TimeSpan.FromSeconds(5));
            for (var i = 0; i < 10; i++)
            {
                var message = (Message<sbyte[]>)consumer.Receive();
                consumer.Acknowledge(message);
                var res = Encoding.UTF8.GetString(message.Data.ToBytes());
                Console.WriteLine($"message '{res}' from topic: {message.TopicName}");
            }

Logical Types

Avro Logical Types are supported. Message object MUST implement ISpecificRecord

    AvroSchema<LogicalMessage> avroSchema = AvroSchema<LogicalMessage>.Of(ISchemaDefinition<LogicalMessage>.Builder().WithPojo(typeof(LogicalMessage)).WithJSR310ConversionEnabled(true).Build());

    public class LogicalMessage : ISpecificRecord
    {
        [LogicalType(LogicalTypeKind.Date)]
        public DateTime CreatedTime { get; set; }
		
        [LogicalType(LogicalTypeKind.TimestampMicrosecond)]
        public DateTime StampMicros { get; set; }

        [LogicalType(LogicalTypeKind.TimestampMillisecond)]
        public DateTime StampMillis { get; set; }
		
	[LogicalType(LogicalTypeKind.TimeMicrosecond)]
        public TimeSpan TimeMicros { get; set; }

        [LogicalType(LogicalTypeKind.TimeMillisecond)]
        public TimeSpan TimeMillis { get; set; }
        
        public AvroDecimal Size { get; set; }
		
        public string DayOfWeek { get; set; }

        [Ignore]
        public Avro.Schema Schema { get; set; }

        public object Get(int fieldPos)
        {
            switch (fieldPos)
            {
                case 0: return CreatedTime; 
	        case 1: return StampMicros;
                case 2: return StampMillis;
	        case 3: return TimeMicros;
                case 4: return TimeMillis;
                case 5: return Size;
                case 6: return DayOfWeek;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }

        public void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
                case 0: CreatedTime = (DateTime)fieldValue; break;
		case 1: StampMicros = (DateTime)fieldValue; break;
                case 2: StampMillis = (DateTime)fieldValue; break;
	        case 3: TimeMicros = (TimeSpan)fieldValue; break;
                case 4: TimeMillis = (TimeSpan)fieldValue; break;
                case 5: Size = (AvroDecimal)fieldValue; break;
                case 6: DayOfWeek = (String)fieldValue; break;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }

KeyValue Schema ALERT!!!!

Because I have become lazy and a lover of "peace of mind":

  • For schema type of KEYVALUESCHEMA:
    • csharp producer.NewMessage().Value<TK, TV>(data).Send();` or `csharp producer.Send<TK, TV>(data)

TK, TV represents the key and value types of the KEYVALUESCHEMA respectively.

Running SharpPulsar Tests in docker container

You can run SharpPulsar tests in docker container. A Dockerfile and docker-compose file is provided at the root folder to help you run these tests in a docker container. docker-compose.yml:

version: "2.4"

services:
  akka-test:
    image: sharp-pulsar-test
    build: 
      context: .
    cpu_count: 1
    mem_limit: 1g
    environment:
      run_count: 2
      # to filter tests, uncomment
      # test_filter: "--filter FullyQualifiedName=SharpPulsar.Test.MessageChunkingTest"
      test_file: Tests/SharpPulsar.Test/SharpPulsar.Test.csproj

Dockerfile:

FROM mcr.microsoft.com/dotnet/sdk:6.0 
ENV test_file="Tests/SharpPulsar.Test/SharpPulsar.Test.csproj"
ENV test_filter=""
ENV run_count=2
RUN mkdir sharppulsar
COPY . ./sharppulsar
RUN ls
WORKDIR /sharppulsar
CMD ["/bin/bash", "-c", "x=1; c=0; while [ $x -le 1 ] && [ $c -le ${run_count} ]; do dotnet test ${test_file} ${test_filter} --framework net6.0 --logger trx; c=$(( $c + 1 )); if [ $? -eq 0 ]; then x=1; else x=0; fi;  done"]

How to:

cd into the root directory and execute docker-compose up run-count is the number of times you want the test repeated. test_filter is used when you need to test a specific test instead of running all the tests in the test suite.

License

This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.

Product Versions
.NET net5.0 net5.0-windows net6.0 net6.0-android net6.0-ios net6.0-maccatalyst net6.0-macos net6.0-tvos net6.0-windows
.NET Core netcoreapp3.0 netcoreapp3.1
.NET Standard netstandard2.1
MonoAndroid monoandroid
MonoMac monomac
MonoTouch monotouch
Tizen tizen60
Xamarin.iOS xamarinios
Xamarin.Mac xamarinmac
Xamarin.TVOS xamarintvos
Xamarin.WatchOS xamarinwatchos
Compatible target framework(s)
Additional computed target framework(s)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.10.0-rc1158 28 8/9/2022
2.10.0-rc1157 27 8/9/2022
2.10.0-rc1150 24 8/8/2022
2.10.0-rc1136 39 7/23/2022
2.10.0-rc1135 36 7/23/2022
2.10.0-rc1134 37 7/23/2022
2.10.0-rc1133 35 7/23/2022
2.10.0-rc1132 33 7/21/2022
2.10.0-rc1130 32 7/21/2022
2.10.0-rc1129 49 7/17/2022
2.10.0-rc1125 72 7/10/2022
2.10.0-rc1123 45 7/9/2022
2.10.0-rc.1117 49 7/3/2022
2.10.0-rc.1108 82 6/25/2022
2.10.0-rc.1090 65 6/9/2022
2.10.0-rc.1088 47 6/9/2022
2.10.0-rc.1083 47 6/8/2022
2.10.0-rc.1070 50 6/3/2022
2.10.0-rc.1051 48 5/26/2022
2.10.0-rc.1037 59 5/16/2022
2.10.0-rc.1035 52 5/13/2022
2.10.0-rc.1034 53 5/13/2022
2.10.0-rc.1033 50 5/13/2022
2.10.0-rc.1032 48 5/13/2022
2.10.0-rc.1031 47 5/13/2022
2.10.0-rc.1022 160 4/2/2022
2.10.0-rc.1013 51 3/26/2022
2.9.0 237 2/21/2022
2.9.0-rc.975 76 2/13/2022
2.9.0-beta.971 53 2/13/2022
2.9.0-beta.47 68 1/9/2022
2.9.0-beta.45 69 12/29/2021
2.9.0-beta.44 68 12/21/2021
2.9.0-beta.43 77 12/19/2021
2.9.0-beta.1 51 2/13/2022
2.2.4 309 11/22/2021
2.2.4-beta.42 84 11/10/2021
2.2.4-beta.41 108 11/3/2021
2.2.4-beta.40 87 9/29/2021
2.2.4-beta 186 9/28/2021
2.2.3 269 9/22/2021
2.2.3-beta 192 9/21/2021
2.2.2 288 9/13/2021
2.2.2-beta 174 9/13/2021
2.2.1 318 9/11/2021
2.2.1-beta 227 9/11/2021
2.2.0 302 9/10/2021
2.2.0-beta 212 9/10/2021
2.1.0 295 9/5/2021
2.1.0-beta.33 99 9/5/2021
2.0.18 252 8/14/2021
2.0.0-beta.31 106 8/14/2021
2.0.0-beta.30 85 8/13/2021
2.0.0-beta.29 96 8/12/2021
2.0.0-beta.28 89 8/11/2021
2.0.0-beta.27 102 8/10/2021
2.0.0-beta.26 86 8/9/2021
2.0.0-beta.25 102 8/6/2021
2.0.0-beta.24 99 8/5/2021
2.0.0-beta.23 115 8/4/2021
2.0.0-beta.22 102 8/4/2021
2.0.0-beta.20 85 7/31/2021
2.0.0-beta.19 141 7/30/2021
2.0.0-beta.15 113 5/13/2021
2.0.0-beta.14 99 5/12/2021
2.0.0-beta.13 102 5/11/2021
2.0.0-beta.12 121 5/10/2021
2.0.0-beta.11 112 5/9/2021
2.0.0-beta.10 149 5/7/2021
2.0.0-beta.9 113 4/23/2021
2.0.0-beta.8 114 4/22/2021
2.0.0-beta.7 109 4/22/2021
2.0.0-beta.6 96 4/22/2021
2.0.0-beta.5 94 4/15/2021
2.0.0-beta.4 110 4/14/2021
2.0.0-beta 188 4/10/2021
1.4.2.1 425 9/3/2020
1.4.2 349 9/2/2020
1.4.1 378 8/29/2020
1.4.0 396 8/29/2020
1.4.0-release.1 51 2/13/2022
1.3.5 375 6/9/2020
1.3.4 376 6/9/2020
1.3.3 383 6/8/2020
1.3.2 353 6/8/2020
1.3.1 496 6/5/2020
1.3.0 405 6/3/2020
1.2.0 577 5/26/2020
1.1.0 422 5/26/2020
1.0.0 451 5/23/2020
0.9.0 395 5/21/2020
0.8.5 391 5/20/2020
0.8.4 409 5/9/2020
0.8.3 400 5/8/2020
0.8.2 395 5/2/2020
0.8.1 395 4/30/2020
0.8.0 416 4/28/2020
0.7.0 403 4/20/2020
0.6.5 366 4/16/2020
0.6.4 407 4/15/2020
0.6.3 395 4/14/2020
0.6.2 404 4/14/2020
0.6.1 386 4/13/2020
0.6.0 413 4/12/2020
0.5.3 529 4/5/2020
0.5.2 414 3/30/2020
0.5.1 496 3/28/2020
0.5.0 382 3/27/2020
0.4.0 414 3/17/2020
0.3.0 396 3/13/2020
0.2.0 382 3/11/2020
0.0.1.1 445 3/7/2020
0.0.1 396 3/7/2020
0.0.1-alpha 322 3/8/2020

• This release contains new feature and fixes found in Apache Pulsar 2.9.0 official java client/drive
• Apache Pulsar BrokerMetadata
• Apache Pulsar Exlusive Producer
• Replaced Ask<T> with TaskCompletionSource<T> to await creation of Pulsar client, producer, consumer and reader
• Fixed TimeZoneId KeyNotFound exception in SQL
• Deploy Apache Pulsar with TestContainer for esay testing

Full changelog at https://github.com/eaba/SharpPulsar/blob/main/CHANGELOG.md