SharpPulsar 2.15.1
dotnet add package SharpPulsar --version 2.15.1
NuGet\Install-Package SharpPulsar -Version 2.15.1
<PackageReference Include="SharpPulsar" Version="2.15.1" />
paket add SharpPulsar --version 2.15.1
#r "nuget: SharpPulsar, 2.15.1"
// Install SharpPulsar as a Cake Addin #addin nuget:?package=SharpPulsar&version=2.15.1 // Install SharpPulsar as a Cake Tool #tool nuget:?package=SharpPulsar&version=2.15.1
SharpPulsar
SharpPulsar is an Apache Pulsar Client built on top Akka.net, which can handle millions of Apache Pulsar Producers/Consumers/Reader/Transaction/Table (in theory).
What Is Akka.NET?
Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET & Mono that is able to support up to 50 million msg/sec on a single machine, with small memory footprint and ~2.5 million actors(or Apache Pulsar Producers/Consumers) per GB of heap.
What Is Apache Pulsar?
Apache Pulsar is a cloud-native, distributed messaging and streaming platform that is able to support millions of topics while delivering high-throughput and low-latency performance.
Supported features
Client
- TLS
- Authentication (token, tls, OAuth2)
- Multi-Hosts Service URL
- Proxy
- SNI Routing
- Transactions
- Subscription(Durable, Non-durable)
- Cluster-level Auto Failover
Producer
- Exclusive Producer
- Partitioned Topics
- Batching
- Compression (LZ4, ZLIB, ZSTD, SNAPPY)
- Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
- User-defined properties
- Key-based batcher
- Delayed/Scheduled messages
- Interceptors
- Message Router (RoundRobin, ConsistentHashing, Broadcast, Random)
- End-to-end Encryption
- Chunking
- Transactions
Consumer
- User-defined properties
- HasMessageAvailable
- Subscription Type (Exclusive, Failover, Shared, Key_Shared)
- Subscription Mode (Durable, Non-durable)
- Interceptors
- Ack (Ack Individual, Ack Commulative, Batch-Index Ack)
- Ack Timeout
- Negative Ack
- Dead Letter Policy
- End-to-end Encryption
- SubscriptionInitialPosition
- Partitioned Topics
- Batching
- Compression (LZ4, ZLIB, ZSTD, SNAPPY)
- Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
- Compacted Topics
- Multiple Topics
- Regex Consumer
- Broker Entry Metadata
Reader
- User-defined properties
- HasMessageAvailable
- Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
- Seek (MessageID, Timestamp)
- Multiple Topics
- End-to-end Encryption
- Interceptors
TableView
- Compacted Topics
- Schema (All supported schema types)
- Register Listener
Extras
- Pulsar SQL
- Pulsar Admin REST API
- Function REST API
- EventSource(Reader/SQL)
- OpenTelemetry (
ProducerOTelInterceptor
,ConsumerOTelInterceptor
)
Getting Started
Install the NuGet package SharpPulsar and follow the Tutorials.
//pulsar client settings builder
var clientConfig = new PulsarClientConfigBuilder()
.ServiceUrl("pulsar://localhost:6650");
//pulsar actor system
var pulsarSystem = PulsarSystem.GetInstance(clientConfig);
var pulsarClient = pulsarSystem.NewClient();
var consumer = pulsarClient.NewConsumer(new ConsumerConfigBuilder<sbyte[]>()
.Topic(myTopic)
.ForceTopicCreation(true)
.SubscriptionName("myTopic-sub"));
var producer = pulsarClient.NewProducer(new ProducerConfigBuilder<sbyte[]>()
.Topic(myTopic));
for (var i = 0; i < 10; i++)
{
var data = Encoding.UTF8.GetBytes($"tuts-{i}").ToSBytes();
producer.NewMessage().Value(data).Send();
}
Thread.Sleep(TimeSpan.FromSeconds(5));
for (var i = 0; i < 10; i++)
{
var message = (Message<sbyte[]>)consumer.Receive();
consumer.Acknowledge(message);
var res = Encoding.UTF8.GetString(message.Data.ToBytes());
Console.WriteLine($"message '{res}' from topic: {message.TopicName}");
}
Logical Types
Avro Logical Types are supported. Message object MUST implement ISpecificRecord
AvroSchema<LogicalMessage> avroSchema = AvroSchema<LogicalMessage>.Of(ISchemaDefinition<LogicalMessage>.Builder().WithPojo(typeof(LogicalMessage)).WithJSR310ConversionEnabled(true).Build());
public class LogicalMessage : ISpecificRecord
{
[LogicalType(LogicalTypeKind.Date)]
public DateTime CreatedTime { get; set; }
[LogicalType(LogicalTypeKind.TimestampMicrosecond)]
public DateTime StampMicros { get; set; }
[LogicalType(LogicalTypeKind.TimestampMillisecond)]
public DateTime StampMillis { get; set; }
[LogicalType(LogicalTypeKind.TimeMicrosecond)]
public TimeSpan TimeMicros { get; set; }
[LogicalType(LogicalTypeKind.TimeMillisecond)]
public TimeSpan TimeMillis { get; set; }
public AvroDecimal Size { get; set; }
public string DayOfWeek { get; set; }
[Ignore]
public Avro.Schema Schema { get; set; }
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return CreatedTime;
case 1: return StampMicros;
case 2: return StampMillis;
case 3: return TimeMicros;
case 4: return TimeMillis;
case 5: return Size;
case 6: return DayOfWeek;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: CreatedTime = (DateTime)fieldValue; break;
case 1: StampMicros = (DateTime)fieldValue; break;
case 2: StampMillis = (DateTime)fieldValue; break;
case 3: TimeMicros = (TimeSpan)fieldValue; break;
case 4: TimeMillis = (TimeSpan)fieldValue; break;
case 5: Size = (AvroDecimal)fieldValue; break;
case 6: DayOfWeek = (String)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
KeyValue Schema ALERT!!!!
Because I have become lazy and a lover of "peace of mind":
- For schema type of KEYVALUESCHEMA:
ORproducer.NewMessage().Value<TK, TV>(data).Send();
producer.Send<TK, TV>(data);
TK, TV
represents the key and value types of the KEYVALUESCHEMA
respectively.
TableView
var topic = $"persistent://public/default/tableview-{DateTime.Now.Ticks}";
var count = 20;
var keys = await PublishMessages(topic, count, false);
var tv = await _client.NewTableViewBuilder(ISchema<string>.Bytes)
.Topic(topic)
.AutoUpdatePartitionsInterval(TimeSpan.FromSeconds(60))
.CreateAsync();
Console.WriteLine($"start tv size: {tv.Size()}");
tv.ForEachAndListen((k, v) => Console.WriteLine($"{k} -> {Encoding.UTF8.GetString(v)}"));
await Task.Delay(5000);
Console.WriteLine($"Current tv size: {tv.Size()}");
tv.ForEachAndListen((k, v) => Console.WriteLine($"checkpoint {k} -> {Encoding.UTF8.GetString(v)}"));
OpenTelemetry
var exportedItems = new List<Activity>();
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("producer", "consumer")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("inmemory-test"))
.AddInMemoryExporter(exportedItems)
.Build();
var producerBuilder = new ProducerConfigBuilder<byte[]>()
.Intercept(new ProducerOTelInterceptor<byte[]>("producer", _client.Log))
.Topic(topic);
var consumerBuilder = new ConsumerConfigBuilder<byte[]>()
.Intercept(new ConsumerOTelInterceptor<byte[]>("consumer", _client.Log))
.Topic(topic);
Cluster-level Auto Failover
var config = new PulsarClientConfigBuilder();
var builder = AutoClusterFailover.Builder().Primary(serviceUrl)
.Secondary(new List<string> { secondary })
.FailoverDelay(TimeSpan.FromSeconds(failoverDelay))
.SwitchBackDelay(TimeSpan.FromSeconds(switchBackDelay))
.CheckInterval(TimeSpan.FromSeconds(checkInterval));
config.ServiceUrlProvider(new AutoClusterFailover((AutoClusterFailoverBuilder)builder));
[Experimental]Running SharpPulsar Tests in docker container (the issue I have faced is how to create container from within a container)
You can run SharpPulsar
tests in docker container. A Dockerfile
and docker-compose
file is provided at the root folder to help you run these tests in a docker container.
docker-compose.yml
:
version: "2.4"
services:
akka-test:
image: sharp-pulsar-test
build:
context: .
cpu_count: 1
mem_limit: 1g
environment:
run_count: 2
# to filter tests, uncomment
# test_filter: "--filter FullyQualifiedName=SharpPulsar.Test.MessageChunkingTest"
test_file: Tests/SharpPulsar.Test/SharpPulsar.Test.csproj
Dockerfile
:
FROM mcr.microsoft.com/dotnet/sdk:6.0
ENV test_file="Tests/SharpPulsar.Test/SharpPulsar.Test.csproj"
ENV test_filter=""
ENV run_count=2
RUN mkdir sharppulsar
COPY . ./sharppulsar
RUN ls
WORKDIR /sharppulsar
CMD ["/bin/bash", "-c", "x=1; c=0; while [ $x -le 1 ] && [ $c -le ${run_count} ]; do dotnet test ${test_file} ${test_filter} --framework net6.0 --logger trx; c=$(( $c + 1 )); if [ $? -eq 0 ]; then x=1; else x=0; fi; done"]
How to:
cd
into the root directory and execute docker-compose up
run-count
is the number of times you want the test repeated.
test_filter
is used when you need to test a specific test instead of running all the tests in the test suite.
License
This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- Akka (>= 1.5.16)
- Akka.Logger.Serilog (>= 1.5.12.1)
- Apache.Avro (>= 1.11.3)
- App.Metrics.Concurrency (>= 4.3.0)
- AvroSchemaGenerator (>= 2.10.0)
- DotNetty.Common (>= 0.7.5)
- Google.Protobuf (>= 3.25.2)
- IdentityModel (>= 6.2.0)
- JsonSubTypes (>= 2.0.1)
- K4os.Compression.LZ4 (>= 1.3.6)
- Microsoft.CSharp (>= 4.7.0)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Logging.Console (>= 8.0.0)
- Microsoft.IO.RecyclableMemoryStream (>= 3.0.0)
- Microsoft.Rest.ClientRuntime (>= 2.3.24)
- Nager.PublicSuffix (>= 3.0.0)
- Newtonsoft.Json (>= 13.0.3)
- Nito.AsyncEx (>= 5.1.2)
- NodaTime (>= 3.1.11)
- OpenTelemetry (>= 1.7.0)
- Portable.BouncyCastle (>= 1.9.0)
- Pro.NBench.xUnit (>= 2.0.0)
- protobuf-net (>= 3.2.30)
- Serilog.Sinks.File (>= 5.0.0)
- SharpPulsar.Admin (>= 2.15.1)
- SharpPulsar.TimeUnit (>= 2.15.1)
- SharpPulsar.Trino (>= 2.15.1)
- SharpZipLib (>= 1.4.2)
- Snappy.Standard (>= 0.2.0)
- System.Diagnostics.Contracts (>= 4.3.0)
- System.IO.Pipelines (>= 8.0.0)
- System.Net.NetworkInformation (>= 4.3.0)
- System.Reactive (>= 6.0.0)
- System.Runtime.CompilerServices.Unsafe (>= 6.0.0)
- System.Runtime.Serialization.Primitives (>= 4.3.0)
- System.Security.Cryptography.Cng (>= 5.0.0)
- System.Text.Json (>= 8.0.2)
- System.Threading.Tasks.Dataflow (>= 8.0.0)
- zlib.net-mutliplatform (>= 1.0.6)
- ZstdNet (>= 1.4.5)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.15.1 | 221 | 2/15/2024 |
2.15.0 | 125 | 2/3/2024 |
2.14.1 | 567 | 9/22/2023 |
2.14.0 | 344 | 7/12/2023 |
2.13.0 | 190 | 5/20/2023 |
2.12.1 | 166 | 5/10/2023 |
2.12.0 | 211 | 5/1/2023 |
2.11.2 | 287 | 3/19/2023 |
2.11.1 | 264 | 3/18/2023 |
2.11.0 | 276 | 3/5/2023 |
2.11.0-rc0117 | 133 | 3/5/2023 |
2.11.0-rc0112 | 170 | 2/2/2023 |
2.11.0-rc0107 | 164 | 2/1/2023 |
2.11.0-rc0105 | 137 | 1/18/2023 |
2.11.0-rc0097 | 163 | 1/15/2023 |
2.10.0-rc1193 | 167 | 1/15/2023 |
2.10.0-rc1191 | 855 | 10/6/2022 |
2.10.0-rc1186 | 157 | 10/4/2022 |
2.10.0-rc1162 | 217 | 9/14/2022 |
2.10.0-rc1158 | 199 | 8/9/2022 |
2.10.0-rc1157 | 175 | 8/9/2022 |
2.10.0-rc1150 | 155 | 8/8/2022 |
2.10.0-rc1136 | 246 | 7/23/2022 |
2.10.0-rc1135 | 191 | 7/23/2022 |
2.10.0-rc1134 | 186 | 7/23/2022 |
2.10.0-rc1133 | 182 | 7/23/2022 |
2.10.0-rc1132 | 141 | 7/21/2022 |
2.10.0-rc1130 | 197 | 7/21/2022 |
2.10.0-rc1129 | 179 | 7/17/2022 |
2.10.0-rc1125 | 214 | 7/10/2022 |
2.10.0-rc1123 | 181 | 7/9/2022 |
2.10.0-rc.1117 | 145 | 7/3/2022 |
2.10.0-rc.1108 | 184 | 6/25/2022 |
2.10.0-rc.1090 | 162 | 6/9/2022 |
2.10.0-rc.1088 | 138 | 6/9/2022 |
2.10.0-rc.1083 | 136 | 6/8/2022 |
2.10.0-rc.1070 | 143 | 6/3/2022 |
2.10.0-rc.1051 | 144 | 5/26/2022 |
2.10.0-rc.1037 | 144 | 5/16/2022 |
2.10.0-rc.1035 | 157 | 5/13/2022 |
2.10.0-rc.1034 | 151 | 5/13/2022 |
2.10.0-rc.1033 | 154 | 5/13/2022 |
2.10.0-rc.1032 | 153 | 5/13/2022 |
2.10.0-rc.1031 | 161 | 5/13/2022 |
2.10.0-rc.1022 | 255 | 4/2/2022 |
2.10.0-rc.1013 | 159 | 3/26/2022 |
2.9.0 | 1,477 | 2/21/2022 |
2.9.0-rc.975 | 182 | 2/13/2022 |
2.9.0-beta.971 | 151 | 2/13/2022 |
2.9.0-beta.47 | 169 | 1/9/2022 |
2.9.0-beta.45 | 167 | 12/29/2021 |
2.9.0-beta.44 | 168 | 12/21/2021 |
2.9.0-beta.43 | 173 | 12/19/2021 |
2.9.0-beta.1 | 151 | 2/13/2022 |
2.2.4 | 504 | 11/22/2021 |
2.2.4-beta.42 | 191 | 11/10/2021 |
2.2.4-beta.41 | 200 | 11/3/2021 |
2.2.4-beta.40 | 179 | 9/29/2021 |
2.2.4-beta | 311 | 9/28/2021 |
2.2.3 | 477 | 9/22/2021 |
2.2.3-beta | 322 | 9/21/2021 |
2.2.2 | 507 | 9/13/2021 |
2.2.2-beta | 294 | 9/13/2021 |
2.2.1 | 548 | 9/11/2021 |
2.2.1-beta | 350 | 9/11/2021 |
2.2.0 | 520 | 9/10/2021 |
2.2.0-beta | 335 | 9/10/2021 |
2.1.0 | 514 | 9/5/2021 |
2.1.0-beta.33 | 199 | 9/5/2021 |
2.0.18 | 504 | 8/14/2021 |
2.0.0-beta.31 | 197 | 8/14/2021 |
2.0.0-beta.30 | 182 | 8/13/2021 |
2.0.0-beta.29 | 191 | 8/12/2021 |
2.0.0-beta.28 | 188 | 8/11/2021 |
2.0.0-beta.27 | 196 | 8/10/2021 |
2.0.0-beta.26 | 187 | 8/9/2021 |
2.0.0-beta.25 | 200 | 8/6/2021 |
2.0.0-beta.24 | 193 | 8/5/2021 |
2.0.0-beta.23 | 202 | 8/4/2021 |
2.0.0-beta.22 | 197 | 8/4/2021 |
2.0.0-beta.20 | 187 | 7/31/2021 |
2.0.0-beta.19 | 243 | 7/30/2021 |
2.0.0-beta.15 | 208 | 5/13/2021 |
2.0.0-beta.14 | 198 | 5/12/2021 |
2.0.0-beta.13 | 199 | 5/11/2021 |
2.0.0-beta.12 | 220 | 5/10/2021 |
2.0.0-beta.11 | 209 | 5/9/2021 |
2.0.0-beta.10 | 243 | 5/7/2021 |
2.0.0-beta.9 | 207 | 4/23/2021 |
2.0.0-beta.8 | 208 | 4/22/2021 |
2.0.0-beta.7 | 212 | 4/22/2021 |
2.0.0-beta.6 | 200 | 4/22/2021 |
2.0.0-beta.5 | 199 | 4/15/2021 |
2.0.0-beta.4 | 210 | 4/14/2021 |
2.0.0-beta | 322 | 4/10/2021 |
1.4.2.1 | 677 | 9/3/2020 |
1.4.2 | 594 | 9/2/2020 |
1.4.1 | 608 | 8/29/2020 |
1.4.0 | 615 | 8/29/2020 |
1.4.0-release.1 | 153 | 2/13/2022 |
1.3.5 | 600 | 6/9/2020 |
1.3.4 | 593 | 6/9/2020 |
1.3.3 | 622 | 6/8/2020 |
1.3.2 | 592 | 6/8/2020 |
1.3.1 | 743 | 6/5/2020 |
1.3.0 | 629 | 6/3/2020 |
1.2.0 | 772 | 5/26/2020 |
1.1.0 | 670 | 5/26/2020 |
1.0.0 | 679 | 5/23/2020 |
0.9.0 | 627 | 5/21/2020 |
0.8.5 | 596 | 5/20/2020 |
0.8.4 | 658 | 5/9/2020 |
0.8.3 | 605 | 5/8/2020 |
0.8.2 | 616 | 5/2/2020 |
0.8.1 | 636 | 4/30/2020 |
0.8.0 | 675 | 4/28/2020 |
0.7.0 | 646 | 4/20/2020 |
0.6.5 | 585 | 4/16/2020 |
0.6.4 | 658 | 4/15/2020 |
0.6.3 | 637 | 4/14/2020 |
0.6.2 | 599 | 4/14/2020 |
0.6.1 | 619 | 4/13/2020 |
0.6.0 | 654 | 4/12/2020 |
0.5.3 | 805 | 4/5/2020 |
0.5.2 | 636 | 3/30/2020 |
0.5.1 | 744 | 3/28/2020 |
0.5.0 | 616 | 3/27/2020 |
0.4.0 | 665 | 3/17/2020 |
0.3.0 | 625 | 3/13/2020 |
0.2.0 | 594 | 3/11/2020 |
0.0.1.1 | 647 | 3/7/2020 |
0.0.1 | 640 | 3/7/2020 |
0.0.1-alpha | 433 | 3/8/2020 |
• [FIX][ServiceUrlProvider] ClientConfigurationData
• [Updated] Nager.PublicSuffix v3.0.0
Full changelog at https://github.com/Sharp-Pulsar/SharpPulsar/blob/refs/tags/2.15.1/CHANGELOG.md