概述
silky 的 RPC 通信基于 DotNetty(.NET 版 Netty)实现,使用长连接 TCP 传输序列化后的消息。服务端监听一个独立的 RPC 端口(默认 2200),接收来自其他微服务实例的调用请求,完成业务执行后将结果写回连接。
整个服务端消息处理链路分为以下层次:
DotNetty Channel(网络层)
│ 接收 TCP 数据流
▼
DecoderHandler(解码层)
│ TransportMessageDecoder 将字节流解码为 TransportMessage
▼
ServerHandler(分发层)
│ 将 TransportMessage 交给 MessageListenerBase
▼
DefaultServerMessageReceivedHandler(处理层)
│ 查找 ServiceEntry、解析参数、执行业务
▼
EncoderHandler(编码层)
│ 将 RemoteResultMessage 编码为字节流
▼
DotNetty Channel 写回响应
DotNetty 服务端启动
RPC 服务端监听器 DotNettyTcpServerMessageListener 在框架初始化阶段(模块 Initialize() 时)启动 DotNetty 服务端:
public async Task Listen()
{
var bootstrap = new ServerBootstrap();
// 根据 UseLibuv 配置选择 I/O 模型
if (_rpcOptions.UseLibuv)
{
m_bossGroup = new DispatcherEventLoopGroup();
m_workerGroup = new WorkerEventLoopGroup((DispatcherEventLoopGroup)m_bossGroup);
bootstrap.Channel<TcpServerChannel>();
}
else
{
m_bossGroup = new MultithreadEventLoopGroup(1); // Accept 线程组(1 个线程)
m_workerGroup = new MultithreadEventLoopGroup(); // Work 线程组(CPU核数 * 2)
bootstrap.Channel<TcpServerSocketChannel>();
}
bootstrap
.Group(m_bossGroup, m_workerGroup)
.Option(ChannelOption.SoBacklog, 128)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
var pipeline = channel.Pipeline;
// SSL 支持(可选)
if (_rpcOptions.IsSsl)
{
var cert = new X509Certificate2(_rpcOptions.SslCertificateName,
_rpcOptions.SslCertificatePassword);
pipeline.AddLast(TlsHandler.Server(cert));
}
// 空闲检测(心跳)
pipeline.AddLast(new IdleStateHandler(0, 0, _rpcOptions.HeartbeatWatchIntervalSeconds));
// 编解码
pipeline.AddLast(new LengthFieldPrepender(4));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
pipeline.AddLast(_transportMessageDecoder);
pipeline.AddLast(_transportMessageEncoder);
// 业务处理器
pipeline.AddLast(new ServerHandler(async (ctx, message) =>
{
await OnReceived(ctx.Channel, message);
}));
}));
m_boundChannel = await bootstrap.BindAsync(_server.RpcEndpoint.Port);
}
关键配置项:
| 配置 | 说明 |
|---|---|
UseLibuv | 使用 Libuv 原生 I/O(默认 false,使用 .NET Socket) |
IsSsl | 启用 TLS 加密(默认 false) |
HeartbeatWatchIntervalSeconds | 空闲检测间隔(默认 300s),超过此时间无流量则触发心跳事件 |
SoBacklog = 128 | TCP accept 队列长度 |
消息的编解码
传输协议
silky 使用 长度字段帧(LengthFieldBasedFrame) 协议解决 TCP 粘包/拆包问题:
┌──────────────┬───────────────────────────────┐
│ Length (4B) │ Body (JSON bytes) │
└──────────────┴───────────────────────────────┘
LengthFieldPrepender(4):发送时在消息头部追加 4 字节长度字段LengthFieldBasedFrameDecoder:接收时按长度字段拆分完整帧
TransportMessage 结构
每条 RPC 消息都被封装为 TransportMessage:
public class TransportMessage
{
public string Id { get; set; } // 全局唯一消息 ID(UUID)
public string ContentType { get; set; } // 内容类型标识(用于区分消息类型)
public byte[] Content { get; set; } // JSON 序列化的消息内容
}
消息类型由 ContentType 决定:
| ContentType | 含义 | 对应类型 |
|---|---|---|
rpc/invoke | RPC 调用请求 | RemoteInvokeMessage |
rpc/result | RPC 调用响应 | RemoteResultMessage |
框架通过 TransportMessageExtensions.GetContent<T>() 将 Content 字段反序列化为对应的消息类型。
ServerHandler — DotNetty 消息分发
ServerHandler 是 DotNetty Pipeline 中的业务 ChannelHandler,负责将解码后的 TransportMessage 转交给框架的消息监听器:
public class ServerHandler : ChannelHandlerAdapter
{
private readonly Func<IChannelHandlerContext, TransportMessage, Task> _readMessageAction;
public override async void ChannelRead(IChannelHandlerContext context, object message)
{
var transportMessage = (TransportMessage)message;
// 将消息转交给 MessageListenerBase.OnReceived()
await _readMessageAction(context, transportMessage);
}
}
MessageListenerBase.OnReceived() 触发 Received 事件,通知所有已注册的处理器(IServerMessageReceivedHandler)。
DefaultServerMessageReceivedHandler — 核心处理逻辑
DefaultServerMessageReceivedHandler 是 RPC 服务端最核心的处理组件,负责将 RPC 请求消息转换为实际的业务方法调用:
public async Task<RemoteResultMessage> Handle(RemoteInvokeMessage message, Context context,
CancellationToken cancellationToken)
{
// 1. 根据 ServiceEntryId 查找本地服务条目
var serviceEntry = _serviceEntryLocator.GetLocalServiceEntryById(message.ServiceEntryId);
if (serviceEntry == null)
{
// 本地不存在该服务条目,返回错误
return new RemoteResultMessage()
{
StatusCode = StatusCode.NotFindLocalServiceEntry,
ExceptionMessage = $"Failed to get local service entry through serviceEntryId {message.ServiceEntryId}"
};
}
// 2. 监控并发数(如果启用了 Monitor)
if (rpcOption.EnableMonitor)
{
var handleInfo = await serverHandleMonitor.GetServerInstanceHandleInfo();
if (handleInfo.ConcurrentCount > handleInfo.AllowMaxConcurrentCount)
{
throw new OverflowMaxServerHandleException("Exceeds the maximum allowable processing concurrency");
}
}
// 3. 解析参数
var parameterResolver = EngineContext.Current.ResolveNamed<IParameterResolver>(
message.ParameterType.ToString());
var parameters = parameterResolver.Parser(serviceEntry, message);
// 4. 执行业务方法(通过本地执行器)
var result = await serviceEntry.Executor(_serviceKeyExecutor.ServiceKey, parameters);
// 5. 构建响应消息
var remoteResultMessage = new RemoteResultMessage()
{
ServiceEntryId = serviceEntry.Id,
Result = result,
StatusCode = StatusCode.Success
};
return remoteResultMessage;
}
步骤 1:定位服务条目
IServiceEntryLocator.GetLocalServiceEntryById() 从本地服务条目管理器(IServiceEntryManager)中查找对应的 ServiceEntry。
// 服务条目 ID = 接口完全限定名 + 方法名 + 参数类型 + HTTP 方法
// 示例: Silky.Sample.IOrderAppService.GetByIdAsync_id_Get
若本地不存在该服务条目(例如消息路由到了错误的实例),返回 StatusCode.NotFindLocalServiceEntry 错误。
步骤 2:并发数保护
当 EnableMonitor = true 时,ServerHandleMonitor 会追踪当前实例正在处理的并发请求数。若超过 GovernanceOptions.MaxConcurrentHandlingCount,则抛出 OverflowMaxServerHandleException,触发客户端的 Polly 故障转移策略(OverflowServerHandleFailoverPolicyProvider),客户端会将请求转发到另一个可用实例。
步骤 3:参数解析
RPC 消息的参数格式取决于调用来源:
ParameterType | 场景 | 解析方式 |
|---|---|---|
Rpc | 内部 RPC 调用(ServiceEntry.Executor) | 按参数位置顺序解析 Parameters[] |
Dict | IInvokeTemplate 字典方式 | 按参数名解析 DictParameters |
Http | HTTP 请求进入服务端 | 按 ParameterFrom 解析 HttpParameters |
IParameterResolver 按 ParameterType.ToString() 命名注册,框架通过 ResolveNamed<IParameterResolver>() 动态获取对应的解析实现。
步骤 4:执行业务方法
直接调用 serviceEntry.Executor(serviceKey, parameters),走本地执行路径(LocalInvoker + 服务端过滤器管道 + 业务方法)。详见 本地执行器与服务端过滤器。
步骤 5:异常处理与响应
catch (ValidationException validationException)
{
// 参数验证异常:返回验证错误列表,不记录异常日志
remoteResultMessage.ValidateErrors = validationException.ValidationErrors.ToArray();
remoteResultMessage.StatusCode = StatusCode.ValidateError;
}
catch (Exception ex) when (ex.IsFriendlyException())
{
// 业务友好异常(UserFriendlyException):仅记录 Warning,不记录堆栈
remoteResultMessage.ExceptionMessage = ex.Message;
remoteResultMessage.StatusCode = ex.GetExceptionStatusCode();
}
catch (Exception ex)
{
// 未知异常:记录完整异常堆栈,抛出让 Polly 策略捕获(可能触发熔断)
Logger.LogException(ex);
context[PollyContextNames.Exception] = ex;
throw;
}
异常分类策略:
- 业务异常(
IsFriendlyException() = true)和验证异常:直接序列化到RemoteResultMessage中返回,不触发 Polly 熔断 - 未知基础设施异常:重新抛出,触发 Polly 熔断计数和 Fallback 执行
RemoteResultMessage — 响应消息结构
public class RemoteResultMessage : IRemoteMessage
{
public string ServiceEntryId { get; set; } // 对应的服务条目 ID
public StatusCode StatusCode { get; set; } // 状态码
public string ExceptionMessage { get; set; } // 异常信息(业务异常时非空)
public ValidateError[] ValidateErrors { get; set; } // 验证错误列表
public object Result { get; set; } // 业务方法返回值(成功时)
public bool IsFile { get; set; } // 是否为文件内容
}
常见 StatusCode 枚举值:
| 状态码 | 含义 |
|---|---|
Success | 执行成功 |
NotFindLocalServiceEntry | 本地不存在该服务条目 |
ValidateError | 参数验证失败 |
BusinessError | UserFriendlyException 业务异常 |
UnAuthentication | 未通过身份认证 |
UnAuthorization | 无权限 |
NotFindService | 未找到服务 |
CommunicationError | 通信异常 |
Polly 策略 — 服务端治理
服务端同样具有 Polly 策略保护层(IHandlePolicyBuilder),在 DefaultServerMessageReceivedHandler 之上包裹:
// 服务端 Polly 策略组合(与客户端结构对称)
IAsyncPolicy<RemoteResultMessage> policy = Policy.NoOpAsync<RemoteResultMessage>();
// 1. 结果策略(如服务端超时)
policy = policy.WrapAsync(handlePolicyWithResultProvider.Create(serviceEntryId));
// 2. 普通策略
policy = policy.WrapAsync(handlePolicyProvider.Create(serviceEntryId));
// 3. 熔断策略(服务端自保护)
policy = policy.WrapAsync(circuitBreakerPolicyProvider.Create(serviceEntryId));
// 4. Fallback 策略(含服务端降级)
policy = policy.WrapAsync(serverHandleFallbackPolicyProvider.Create(message));
服务端 Polly 策略的主要作用:
- 当异常发生 N 次后,触发服务端熔断,临时拒绝该服务条目的请求(防止雪崩)
- 通过
[Fallback]特性配置的服务端降级方法(在熔断时返回缓存结果或默认值)
RpcContext — 调用链上下文
RpcContext 是基于 AsyncLocal<T> 的线程安全上下文容器,在整个 RPC 调用链中保存调用相关的元数据:
// 服务端通过 Codec 从 TransportMessage 还原 RpcContext
RpcContext.Context.SetMessageId(message.Id);
RpcContext.Context.SetAttachments(remoteInvokeMessage.Attachments);
RpcContext.Context.SetTransAttachments(remoteInvokeMessage.TransAttachments);
// 业务代码中可通过 RpcContext 访问:
var currentUserId = RpcContext.Context.GetUserId();
var currentTenantId = RpcContext.Context.GetTenantId();
var traceId = RpcContext.Context.GetTraceId();
附件透传机制(Attachments):
- 客户端在
RemoteInvokeMessage.Attachments中写入当前用户、租户、语言、TraceId 等信息 - 服务端收到消息后,将
Attachments还原到RpcContext - 服务端再次发起下游 RPC 调用时,框架自动将
RpcContext中的附件透传到TransAttachments - 这样用户身份和调用链信息在整个微服务调用链中隐式透传,无需在方法签名中显式传递
RPC 服务端完整流水线图
客户端发出 TCP 数据包
│
▼
DotNetty ServerBootstrap
│ BossGroup: Accept 新连接
│ WorkerGroup: 处理 I/O 读写
▼
Channel Pipeline(每个连接独立的处理链)
│
├── TlsHandler(SSL,可选)
│
├── IdleStateHandler(心跳超时检测)
│
├── LengthFieldBasedFrameDecoder(解决粘包/拆包)
│
├── TransportMessageDecoder(JSON 反序列化 → TransportMessage)
│
├── TransportMessageEncoder(编码响应)
│
└── ServerHandler(ChannelRead 触发)
│
▼
MessageListenerBase.OnReceived()
│ 触发 Received 事件
▼
RpcContext 初始化(还原 Attachments)
│
▼
Polly 策略保护层(IHandlePolicyBuilder)
│
▼
DefaultServerMessageReceivedHandler.Handle()
│ 1. 查找 ServiceEntry(按 ServiceEntryId)
│ 2. 并发数保护(可选)
│ 3. 解析参数(IParameterResolver)
│ 4. serviceEntry.Executor() → LocalInvoker
│ 5. 构建 RemoteResultMessage
│
▼
TransportMessageEncoder 编码
│
▼
DotNetty Channel 写回客户端
