Silky微服务框架在线文档Silky微服务框架在线文档
首页
文档
配置
源码解析
博文
github
gitee
首页
文档
配置
源码解析
博文
github
gitee
  • 简介

    • silky 框架介绍
  • 入门

    • 名词解释
    • 快速开始
    • 脚手架
    • 微服务模块化架构的最佳实践 & 约定
    • 示例
  • 主机与模块

    • 主机
    • 网关
    • 模块
    • 插件
  • 网关与 HTTP

    • Swagger 文档
    • 性能分析(MiniProfiler)
    • 跨域(CORS)
    • 审计日志
  • 服务与 RPC

    • 应用服务和服务条目
    • rpc通信
    • websocket通信
    •  服务注册中心
    • 服务治理
  • 数据与缓存

    • EFCore 数据访问
    • 缓存
    • 分布式锁
  • 安全与认证

    • 身份认证与授权
    • 分布式事务
  • 基础设施

    • 依赖注入
    • 对象到对象的映射
    • 参数验证
    • 链路跟踪
    • 日志(Serilog)
    • 健康检查
    • 消息总线(MassTransit)
    • 单元测试与集成测试

概述

在微服务架构中,服务之间除了同步 RPC 调用,还经常需要通过消息队列实现异步通信,以解耦服务、提高系统吞吐量。silky 框架通过 Silky.MassTransit 模块集成了 MassTransit,为微服务提供消息总线能力。

Silky.MassTransit 的核心职责是在 MassTransit 的发布/消费流程中自动传递 RpcContext 上下文信息(如用户身份、TraceId、调用链路等),使得消息消费端能够获取到与 RPC 调用一致的上下文,从而支持审计日志、链路追踪等能力。

安装

<PackageReference Include="Silky.MassTransit" Version="3.9.1" />

除此之外,还需要安装对应的 MassTransit 传输包(如 RabbitMQ):

<PackageReference Include="MassTransit.RabbitMQ" Version="8.x.x" />

消息发布

使用 PublishForSilky 发布消息

silky 扩展了 MassTransit 的 IPublishEndpoint,提供了 PublishForSilky<T>() 方法。相比标准的 Publish(),PublishForSilky() 会自动将当前 RpcContext 中的所有附加信息(用户信息、调用链路等)作为消息头传递给消费端:

using MassTransit;

public class OrderAppService : IOrderAppService, IScopedDependency
{
    private readonly IPublishEndpoint _publishEndpoint;

    public OrderAppService(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    public async Task<string> CreateOrderAsync(CreateOrderInput input)
    {
        // 创建订单...
        var order = await CreateOrder(input);

        // 发布订单创建事件(自动携带 RpcContext 上下文)
        await _publishEndpoint.PublishForSilky(new OrderCreatedEvent
        {
            OrderId = order.Id,
            UserId = order.UserId,
            TotalAmount = order.TotalAmount,
            CreatedTime = order.CreatedTime
        });

        return order.Id.ToString();
    }
}

使用 SendForSilky 发送消息

对于点对点消息(Send),使用 SendForSilky<T>():

public class OrderAppService : IOrderAppService, IScopedDependency
{
    private readonly ISendEndpointProvider _sendEndpointProvider;

    public OrderAppService(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpointProvider = sendEndpointProvider;
    }

    public async Task NotifyPaymentServiceAsync(long orderId)
    {
        var endpoint = await _sendEndpointProvider.GetSendEndpoint(
            new Uri("queue:payment-service"));

        await endpoint.SendForSilky(new OrderPaymentRequest
        {
            OrderId = orderId
        });
    }
}

注意

在 silky 框架中,如果需要消费端能够正确获取 RpcContext 中的用户信息(用于审计、Session 等),必须使用 PublishForSilky / SendForSilky 而不是 MassTransit 原生的 Publish / Send。

消息消费

继承 SilkyConsumer<T>

silky 提供了 SilkyConsumer<TMessage> 基类,继承它来实现消息消费逻辑:

using Silky.MassTransit.Consumer;

public class OrderCreatedConsumer : SilkyConsumer<OrderCreatedEvent>
{
    private readonly IInventoryAppService _inventoryAppService;
    private readonly ILogger<OrderCreatedConsumer> _logger;

    public OrderCreatedConsumer(
        IInventoryAppService inventoryAppService,
        ILogger<OrderCreatedConsumer> logger)
    {
        _inventoryAppService = inventoryAppService;
        _logger = logger;
    }

    protected override async Task ConsumeWork(ConsumeContext<OrderCreatedEvent> context)
    {
        var message = context.Message;
        _logger.LogInformation("处理订单创建事件: OrderId={OrderId}", message.OrderId);

        // 此处可以通过 NullSession.Instance 获取用户信息(由发布端传递过来)
        // 也可以正常调用其他微服务的 RPC 接口
        await _inventoryAppService.ReserveInventoryAsync(message.OrderId);
    }
}

SilkyConsumer<T> 相比直接实现 MassTransit IConsumer<T> 的优势:

  1. 自动传递 RpcContext:从消息头中恢复 RpcContext,使消费端能获取发布端的用户身份、调用链路等信息
  2. 集成 EFCore 工作单元:支持在 ConsumeWork 方法上标注 [UnitOfWork],自动管理数据库事务
  3. 作用域管理:自动创建 IoC 作用域,防止 Scoped 服务生命周期问题

支持工作单元

在 ConsumeWork 方法上标注 [UnitOfWork],框架会自动管理数据库事务:

public class OrderPaymentConsumer : SilkyConsumer<OrderPaymentEvent>
{
    private readonly IOrderRepository _orderRepository;

    public OrderPaymentConsumer(IOrderRepository orderRepository)
    {
        _orderRepository = orderRepository;
    }

    [UnitOfWork]  // 自动开启数据库事务,成功提交,失败回滚
    protected override async Task ConsumeWork(ConsumeContext<OrderPaymentEvent> context)
    {
        var order = await _orderRepository.FindAsync(context.Message.OrderId);
        order.MarkAsPaid();
        await _orderRepository.UpdateNowAsync(order);
    }
}

完整配置示例

以下是在网关或微服务主机中配置 MassTransit + RabbitMQ 的完整示例:

// ConfigureService.cs
public class ConfigureService : IConfigureService
{
    public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
    {
        services.AddMassTransit(x =>
        {
            // 注册消费者
            x.AddConsumer<OrderCreatedConsumer>();
            x.AddConsumer<OrderPaymentConsumer>();

            x.UsingRabbitMq((ctx, cfg) =>
            {
                cfg.Host(configuration["RabbitMq:Host"], configuration["RabbitMq:VirtualHost"], h =>
                {
                    h.Username(configuration["RabbitMq:UserName"]);
                    h.Password(configuration["RabbitMq:Password"]);
                });

                // 配置接收端点
                cfg.ReceiveEndpoint("order-events", e =>
                {
                    e.ConfigureConsumer<OrderCreatedConsumer>(ctx);
                    e.ConfigureConsumer<OrderPaymentConsumer>(ctx);
                });
            });
        });
    }

    public int Order => 1;
}

对应配置文件:

RabbitMq:
  Host: localhost
  VirtualHost: /
  UserName: guest
  Password: guest

注意事项

注意

  1. SilkyConsumer<T> 中注入的依赖(通过构造函数)使用的是消费端创建的 IoC 作用域,不是 发布端的 IoC 作用域。
  2. 消息消费是异步的,消费端不保证与发布端在同一个事务中。如果需要消息的可靠性,请配置 MassTransit 的重试/死信队列策略。
  3. RpcContext 的传递依赖消息头,MassTransit 传输层(如 RabbitMQ)默认支持消息头,无需额外配置。
编辑当前页
Prev
健康检查
Next
单元测试与集成测试