FlexArch.OutBox.EntityFramework
1.0.0-alpha.1
dotnet add package FlexArch.OutBox.EntityFramework --version 1.0.0-alpha.1
NuGet\Install-Package FlexArch.OutBox.EntityFramework -Version 1.0.0-alpha.1
<PackageReference Include="FlexArch.OutBox.EntityFramework" Version="1.0.0-alpha.1" />
<PackageVersion Include="FlexArch.OutBox.EntityFramework" Version="1.0.0-alpha.1" />
<PackageReference Include="FlexArch.OutBox.EntityFramework" />
paket add FlexArch.OutBox.EntityFramework --version 1.0.0-alpha.1
#r "nuget: FlexArch.OutBox.EntityFramework, 1.0.0-alpha.1"
#:package FlexArch.OutBox.EntityFramework@1.0.0-alpha.1
#addin nuget:?package=FlexArch.OutBox.EntityFramework&version=1.0.0-alpha.1&prerelease
#tool nuget:?package=FlexArch.OutBox.EntityFramework&version=1.0.0-alpha.1&prerelease
FlexArch.OutBox
分层的 OutBox 分布式事务模式实现 - 专为.NET 微服务架构设计的轻量级、高性能分布式事务解决方案。
✨ 特性
- 🎯 完整的 OutBox 模式实现 - 保证消息与业务数据的事务一致性
- 🏗️ 优雅的架构设计 - 基于 DDD 和 Clean Architecture 原则
- 🔧 简单易用 - 流畅的 API 设计,3 行代码即可集成
- 🚀 高性能 - 批量处理、数据库索引优化、异步处理
- 🛡️ 生产就绪 - 重试、熔断、健康检查、监控集成
- 🔌 高度可扩展 - 中间件模式,支持自定义扩展
- 📦 多存储支持 - Entity Framework Core(支持所有主流数据库)
- 🌐 多传输支持 - RabbitMQ + 可扩展架构
- 📊 完整监控 - OpenTelemetry、结构化日志、性能指标
- 🔧 类型兼容 - 智能处理 JsonElement 等复杂类型,确保消息队列兼容性
🚀 快速开始
安装
# 核心包
dotnet add package FlexArch.OutBox.Core
# EF Core持久化支持
dotnet add package FlexArch.OutBox.Persistence.EFCore
# RabbitMQ消息发布
dotnet add package FlexArch.OutBox.Publisher.RabbitMQ
基础配置
// Program.cs
builder.Services
.AddOutbox(options =>
{
options.ProcessingInterval = TimeSpan.FromSeconds(10);
options.BatchSize = 100;
})
.AddEfPersistence<YourDbContext>()
.AddRabbitMqOutbox(connectionString => "amqp://localhost");
var app = builder.Build();
数据库配置
// 在你的DbContext中
public class YourDbContext : DbContext
{
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// 一键应用OutBox配置(包括索引)
modelBuilder.ApplyOutboxEntityConfigurations();
// 你的其他实体配置...
}
}
业务代码使用
public class OrderService
{
private readonly YourDbContext _context;
private readonly IOutboxStore _outboxStore;
public async Task CreateOrderAsync(CreateOrderCommand command)
{
using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// 1. 保存业务数据
var order = new Order(command.CustomerId, command.Items);
_context.Orders.Add(order);
// 2. 保存OutBox消息(不会立即提交)
var outboxMessage = new OutboxMessage
{
Type = "OrderCreated",
Payload = JsonSerializer.Serialize(new OrderCreatedEvent(order.Id))
};
await _outboxStore.SaveAsync(outboxMessage);
// 3. 原子提交 - 保证一致性
await _context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
📚 详细文档
中间件配置
FlexArch.OutBox 提供了丰富的中间件来增强消息处理能力:
builder.Services
.AddOutbox()
.AddEfPersistence<YourDbContext>()
.AddRabbitMqOutbox(connectionString)
// 重试机制
.WithRetry(retry =>
{
retry.MaxRetryCount = 3;
retry.DelayInSeconds = 2;
})
// 熔断器
.WithCriticalBreaker(cb =>
{
cb.FailureThreshold = 5;
cb.DurationOfBreakInSeconds = 30;
})
// 消息签名
.WithMessageSigning(signing =>
{
signing.SecretKey = builder.Configuration["OutBox:SigningKey"];
signing.EnableSigning = true;
})
// 延迟消息
.WithDelay()
// 分布式追踪
.WithTracing()
// 性能指标
.WithMetrics()
// 健康检查
.AddOutboxHealthChecks();
配置选项
builder.Services.Configure<OutboxOptions>(options =>
{
options.ProcessingInterval = TimeSpan.FromSeconds(10); // 处理间隔
options.BatchSize = 100; // 批次大小
options.MessageProcessingTimeout = TimeSpan.FromSeconds(30); // 超时时间
options.EnableVerboseLogging = false; // 详细日志
});
支持的数据库
通过 Entity Framework Core,支持所有主流数据库:
- SQL Server
- PostgreSQL
- MySQL
- SQLite
- Oracle
- 等等...
支持的消息队列
- ✅ RabbitMQ
- 🔄 Apache Kafka (计划中)
- 🔄 Azure Service Bus (计划中)
- 🔄 Amazon SQS (计划中)
🏗️ 架构设计
包结构
FlexArch.OutBox.Abstractions # 核心抽象接口
FlexArch.OutBox.Core # 核心实现
FlexArch.OutBox.Persistence.EFCore # EF Core持久化
FlexArch.OutBox.Publisher.RabbitMQ # RabbitMQ发布器
中间件管道
消息 → 延迟中间件 → 重试中间件 → 熔断中间件 → 签名中间件 → 指标中间件 → 追踪中间件 → 发布器
📊 监控和观测
健康检查
app.MapHealthChecks("/health", new HealthCheckOptions
{
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
});
OpenTelemetry 集成
builder.Services.AddOpenTelemetry()
.WithTracing(builder =>
{
builder.AddSource("Outbox");
});
性能指标
outbox.event.duration
- 消息处理耗时outbox.event.success
- 成功处理计数outbox.event.failure
- 失败处理计数
🛠️ 高级用法
自定义中间件
public class CustomMiddleware : IOutboxMiddleware
{
public async Task InvokeAsync(IOutboxMessage message, OutboxPublishDelegate next)
{
// 前置处理
await next(message);
// 后置处理
}
}
// 注册
builder.Services.AddScoped<IOutboxMiddleware, CustomMiddleware>();
自定义发布器
public class CustomPublisher : IOutboxPublisher
{
public async Task PublishAsync(IOutboxMessage message)
{
// 自定义发布逻辑
}
}
// 注册
builder.Services.AddScoped<IOutboxPublisher, CustomPublisher>();
📋 最佳实践
1. 事务边界控制
// ✅ 正确 - 在业务事务中一起提交
using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// 业务操作
await _businessService.DoSomething();
// OutBox消息
await _outboxStore.SaveAsync(message);
// 一起提交
await _context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch { await transaction.RollbackAsync(); throw; }
2. 消息幂等性
var message = new OutboxMessage
{
Id = Guid.NewGuid(), // 确保唯一性
Type = "OrderCreated",
Payload = JsonSerializer.Serialize(orderEvent),
Headers = new Dictionary<string, object?>
{
["CorrelationId"] = correlationId,
["IdempotencyKey"] = idempotencyKey
}
};
3. 错误处理
死信队列会自动处理失败的消息,无需手动干预。
🚧 故障排查
常见问题
Q: 消息没有被处理?
A: 检查 OutboxProcessor
后台服务是否正常运行,查看日志中是否有错误信息。
Q: 数据库性能问题?
A: 确保已应用推荐的数据库索引:modelBuilder.ApplyOutboxEntityConfigurations()
Q: 消息重复发送?
A: 检查消息消费端的幂等性实现,确保使用 CorrelationId
或 IdempotencyKey
。
日志级别
{
"Logging": {
"LogLevel": {
"FlexArch.OutBox": "Information"
}
}
}
🤝 贡献
我们欢迎社区贡献!请查看我们的 贡献指南 了解如何:
- 🐛 报告问题和提交 Bug 修复
- ✨ 提议和实现新功能
- 📚 改进文档和示例
- 🧪 增加测试覆盖率
快速开始贡献:
# 1. Fork 并克隆项目
git clone https://github.com/YOUR-USERNAME/FlexArch.OutBox.git
cd FlexArch.OutBox
# 2. 创建功能分支
git checkout -b feature/your-feature-name
# 3. 构建和测试
dotnet build
dotnet test
# 4. 提交您的更改
git commit -m "feat: 添加您的功能描述"
git push origin feature/your-feature-name
# 5. 创建 Pull Request
📄 许可证
本项目采用 MIT 许可证。
🔗 相关链接
⭐ 如果这个项目对你有帮助,请给个星星!
FlexArch.OutBox - 让分布式事务变得简单可靠!
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- FlexArch.OutBox.Abstractions (>= 1.0.0-alpha.1)
- Microsoft.EntityFrameworkCore (>= 9.0.6)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on FlexArch.OutBox.EntityFramework:
Package | Downloads |
---|---|
FlexArch.OutBox.RabbitMQ
FlexArch OutBox RabbitMQ消息发布器 - 高性能的RabbitMQ集成实现 |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.0.0-alpha.1 | 110 | 7/10/2025 |
Initial alpha release with EF Core integration and optimized database
access