Overview
Besides synchronous RPC, microservices often need asynchronous messaging via a message queue to decouple services and improve throughput. Silky integrates MassTransit via the Silky.MassTransit module.
The core responsibility of Silky.MassTransit is to automatically propagate RpcContext (user identity, TraceId, call-chain data, etc.) through the MassTransit publish/consume pipeline, so that message consumers have the same context as RPC callers — enabling auditing, distributed tracing, and more.
Installation
<PackageReference Include="Silky.MassTransit" Version="3.9.2" />
Install the MassTransit transport package for your broker:
<!-- RabbitMQ -->
<PackageReference Include="MassTransit.RabbitMQ" Version="8.x.x" />
<!-- Kafka -->
<PackageReference Include="MassTransit.Kafka" Version="8.x.x" />
Publishing Messages
PublishForSilky
Use PublishForSilky<T>() to publish an event — automatically includes all RpcContext metadata as message headers:
public class OrderAppService : IOrderAppService, IScopedDependency
{
private readonly IPublishEndpoint _publishEndpoint;
public OrderAppService(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task<string> CreateOrderAsync(CreateOrderInput input)
{
var order = await CreateOrder(input);
// Publish order created event (automatically carries RpcContext)
await _publishEndpoint.PublishForSilky(new OrderCreatedEvent
{
OrderId = order.Id,
UserId = order.UserId,
TotalAmount = order.TotalAmount,
CreatedTime = order.CreatedTime
});
return order.Id.ToString();
}
}
SendForSilky
For point-to-point messages (Send to a specific queue):
public async Task NotifyPaymentServiceAsync(long orderId)
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(
new Uri("queue:payment-service"));
await endpoint.SendForSilky(new OrderPaymentRequest
{
OrderId = orderId
});
}
Consuming Messages
Implement IConsumer<T> — the RpcContext from the publisher is automatically restored in the consumer:
public class OrderCreatedEventConsumer : IConsumer<OrderCreatedEvent>
{
private readonly ISession _session;
private readonly ILogger<OrderCreatedEventConsumer> _logger;
public OrderCreatedEventConsumer(ISession session, ILogger<OrderCreatedEventConsumer> logger)
{
_session = session;
_logger = logger;
}
public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
{
// _session.UserId is restored from the publisher's RpcContext
_logger.LogInformation("Order {OrderId} created by user {UserId}",
context.Message.OrderId, _session.UserId);
// Process the event...
await Task.CompletedTask;
}
}
Configuration
Register MassTransit with RabbitMQ
public class ConfigureService : IConfigureService
{
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
services.AddSilkyMassTransit(x =>
{
x.AddConsumer<OrderCreatedEventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
// Apply Silky middleware — propagates RpcContext through the pipeline
cfg.UseSilkyMassTransit();
});
});
}
}
RpcContext Propagation
Silky.MassTransit uses MassTransit middleware (ISendFilter, IPublishFilter, IConsumeFilter) to transparently transfer RpcContext attachment keys as message headers. This ensures:
- The current user identity (
UserId,TenantId) is available in the consumer - TraceId is propagated so SkyAPM can link the message consumption span to the original request trace
- Audit log can correctly attribute the operation to the originating user even across async boundaries
