概述
在微服务架构中,服务之间除了同步 RPC 调用,还经常需要通过消息队列实现异步通信,以解耦服务、提高系统吞吐量。silky 框架通过 Silky.MassTransit 模块集成了 MassTransit,为微服务提供消息总线能力。
Silky.MassTransit 的核心职责是在 MassTransit 的发布/消费流程中自动传递 RpcContext 上下文信息(如用户身份、TraceId、调用链路等),使得消息消费端能够获取到与 RPC 调用一致的上下文,从而支持审计日志、链路追踪等能力。
安装
<PackageReference Include="Silky.MassTransit" Version="3.9.1" />
除此之外,还需要安装对应的 MassTransit 传输包(如 RabbitMQ):
<PackageReference Include="MassTransit.RabbitMQ" Version="8.x.x" />
消息发布
使用 PublishForSilky 发布消息
silky 扩展了 MassTransit 的 IPublishEndpoint,提供了 PublishForSilky<T>() 方法。相比标准的 Publish(),PublishForSilky() 会自动将当前 RpcContext 中的所有附加信息(用户信息、调用链路等)作为消息头传递给消费端:
using MassTransit;
public class OrderAppService : IOrderAppService, IScopedDependency
{
private readonly IPublishEndpoint _publishEndpoint;
public OrderAppService(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task<string> CreateOrderAsync(CreateOrderInput input)
{
// 创建订单...
var order = await CreateOrder(input);
// 发布订单创建事件(自动携带 RpcContext 上下文)
await _publishEndpoint.PublishForSilky(new OrderCreatedEvent
{
OrderId = order.Id,
UserId = order.UserId,
TotalAmount = order.TotalAmount,
CreatedTime = order.CreatedTime
});
return order.Id.ToString();
}
}
使用 SendForSilky 发送消息
对于点对点消息(Send),使用 SendForSilky<T>():
public class OrderAppService : IOrderAppService, IScopedDependency
{
private readonly ISendEndpointProvider _sendEndpointProvider;
public OrderAppService(ISendEndpointProvider sendEndpointProvider)
{
_sendEndpointProvider = sendEndpointProvider;
}
public async Task NotifyPaymentServiceAsync(long orderId)
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(
new Uri("queue:payment-service"));
await endpoint.SendForSilky(new OrderPaymentRequest
{
OrderId = orderId
});
}
}
注意
在 silky 框架中,如果需要消费端能够正确获取 RpcContext 中的用户信息(用于审计、Session 等),必须使用 PublishForSilky / SendForSilky 而不是 MassTransit 原生的 Publish / Send。
消息消费
继承 SilkyConsumer<T>
silky 提供了 SilkyConsumer<TMessage> 基类,继承它来实现消息消费逻辑:
using Silky.MassTransit.Consumer;
public class OrderCreatedConsumer : SilkyConsumer<OrderCreatedEvent>
{
private readonly IInventoryAppService _inventoryAppService;
private readonly ILogger<OrderCreatedConsumer> _logger;
public OrderCreatedConsumer(
IInventoryAppService inventoryAppService,
ILogger<OrderCreatedConsumer> logger)
{
_inventoryAppService = inventoryAppService;
_logger = logger;
}
protected override async Task ConsumeWork(ConsumeContext<OrderCreatedEvent> context)
{
var message = context.Message;
_logger.LogInformation("处理订单创建事件: OrderId={OrderId}", message.OrderId);
// 此处可以通过 NullSession.Instance 获取用户信息(由发布端传递过来)
// 也可以正常调用其他微服务的 RPC 接口
await _inventoryAppService.ReserveInventoryAsync(message.OrderId);
}
}
SilkyConsumer<T> 相比直接实现 MassTransit IConsumer<T> 的优势:
- 自动传递 RpcContext:从消息头中恢复 RpcContext,使消费端能获取发布端的用户身份、调用链路等信息
- 集成 EFCore 工作单元:支持在
ConsumeWork方法上标注[UnitOfWork],自动管理数据库事务 - 作用域管理:自动创建 IoC 作用域,防止 Scoped 服务生命周期问题
支持工作单元
在 ConsumeWork 方法上标注 [UnitOfWork],框架会自动管理数据库事务:
public class OrderPaymentConsumer : SilkyConsumer<OrderPaymentEvent>
{
private readonly IOrderRepository _orderRepository;
public OrderPaymentConsumer(IOrderRepository orderRepository)
{
_orderRepository = orderRepository;
}
[UnitOfWork] // 自动开启数据库事务,成功提交,失败回滚
protected override async Task ConsumeWork(ConsumeContext<OrderPaymentEvent> context)
{
var order = await _orderRepository.FindAsync(context.Message.OrderId);
order.MarkAsPaid();
await _orderRepository.UpdateNowAsync(order);
}
}
完整配置示例
以下是在网关或微服务主机中配置 MassTransit + RabbitMQ 的完整示例:
// ConfigureService.cs
public class ConfigureService : IConfigureService
{
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
// 注册消费者
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<OrderPaymentConsumer>();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(configuration["RabbitMq:Host"], configuration["RabbitMq:VirtualHost"], h =>
{
h.Username(configuration["RabbitMq:UserName"]);
h.Password(configuration["RabbitMq:Password"]);
});
// 配置接收端点
cfg.ReceiveEndpoint("order-events", e =>
{
e.ConfigureConsumer<OrderCreatedConsumer>(ctx);
e.ConfigureConsumer<OrderPaymentConsumer>(ctx);
});
});
});
}
public int Order => 1;
}
对应配置文件:
RabbitMq:
Host: localhost
VirtualHost: /
UserName: guest
Password: guest
注意事项
注意
SilkyConsumer<T>中注入的依赖(通过构造函数)使用的是消费端创建的 IoC 作用域,不是 发布端的 IoC 作用域。- 消息消费是异步的,消费端不保证与发布端在同一个事务中。如果需要消息的可靠性,请配置 MassTransit 的重试/死信队列策略。
- RpcContext 的传递依赖消息头,MassTransit 传输层(如 RabbitMQ)默认支持消息头,无需额外配置。
