Silky微服务框架在线文档Silky微服务框架在线文档
首页
文档
配置
源码解析
博文
github
gitee
首页
文档
配置
源码解析
博文
github
gitee
  • 启动时

    • 主机的构建
    • 服务引擎
    • 模块
    • 服务与服务条目的解析
    • Silky服务主机
    • 依赖注入约定
    • RPC 服务代理
  • 运行时

    • 终结点与路由
    • 执行器与调度
    • 本地执行器与服务端过滤器
    • 远程执行器与 RPC 调用
    • RPC 服务端消息处理
    • 服务治理
    • 缓存拦截器
    • 分布式事务(TCC)
    • HTTP 网关请求管道
    • 过滤器执行管道
    • Polly 弹性策略管道
    • 端点健康监控

概述

silky 的 RPC 通信基于 DotNetty(.NET 版 Netty)实现,使用长连接 TCP 传输序列化后的消息。服务端监听一个独立的 RPC 端口(默认 2200),接收来自其他微服务实例的调用请求,完成业务执行后将结果写回连接。

整个服务端消息处理链路分为以下层次:

DotNetty Channel(网络层)
    │  接收 TCP 数据流
    ▼
DecoderHandler(解码层)
    │  TransportMessageDecoder 将字节流解码为 TransportMessage
    ▼
ServerHandler(分发层)
    │  将 TransportMessage 交给 MessageListenerBase
    ▼
DefaultServerMessageReceivedHandler(处理层)
    │  查找 ServiceEntry、解析参数、执行业务
    ▼
EncoderHandler(编码层)
    │  将 RemoteResultMessage 编码为字节流
    ▼
DotNetty Channel 写回响应

DotNetty 服务端启动

RPC 服务端监听器 DotNettyTcpServerMessageListener 在框架初始化阶段(模块 Initialize() 时)启动 DotNetty 服务端:

public async Task Listen()
{
    var bootstrap = new ServerBootstrap();

    // 根据 UseLibuv 配置选择 I/O 模型
    if (_rpcOptions.UseLibuv)
    {
        m_bossGroup = new DispatcherEventLoopGroup();
        m_workerGroup = new WorkerEventLoopGroup((DispatcherEventLoopGroup)m_bossGroup);
        bootstrap.Channel<TcpServerChannel>();
    }
    else
    {
        m_bossGroup = new MultithreadEventLoopGroup(1);   // Accept 线程组(1 个线程)
        m_workerGroup = new MultithreadEventLoopGroup();  // Work 线程组(CPU核数 * 2)
        bootstrap.Channel<TcpServerSocketChannel>();
    }

    bootstrap
        .Group(m_bossGroup, m_workerGroup)
        .Option(ChannelOption.SoBacklog, 128)
        .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
        {
            var pipeline = channel.Pipeline;

            // SSL 支持(可选)
            if (_rpcOptions.IsSsl)
            {
                var cert = new X509Certificate2(_rpcOptions.SslCertificateName,
                    _rpcOptions.SslCertificatePassword);
                pipeline.AddLast(TlsHandler.Server(cert));
            }

            // 空闲检测(心跳)
            pipeline.AddLast(new IdleStateHandler(0, 0, _rpcOptions.HeartbeatWatchIntervalSeconds));

            // 编解码
            pipeline.AddLast(new LengthFieldPrepender(4));
            pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
            pipeline.AddLast(_transportMessageDecoder);
            pipeline.AddLast(_transportMessageEncoder);

            // 业务处理器
            pipeline.AddLast(new ServerHandler(async (ctx, message) =>
            {
                await OnReceived(ctx.Channel, message);
            }));
        }));

    m_boundChannel = await bootstrap.BindAsync(_server.RpcEndpoint.Port);
}

关键配置项:

配置说明
UseLibuv使用 Libuv 原生 I/O(默认 false,使用 .NET Socket)
IsSsl启用 TLS 加密(默认 false)
HeartbeatWatchIntervalSeconds空闲检测间隔(默认 300s),超过此时间无流量则触发心跳事件
SoBacklog = 128TCP accept 队列长度

消息的编解码

传输协议

silky 使用 长度字段帧(LengthFieldBasedFrame) 协议解决 TCP 粘包/拆包问题:

┌──────────────┬───────────────────────────────┐
│  Length (4B) │       Body (JSON bytes)        │
└──────────────┴───────────────────────────────┘
  • LengthFieldPrepender(4):发送时在消息头部追加 4 字节长度字段
  • LengthFieldBasedFrameDecoder:接收时按长度字段拆分完整帧

TransportMessage 结构

每条 RPC 消息都被封装为 TransportMessage:

public class TransportMessage
{
    public string Id { get; set; }            // 全局唯一消息 ID(UUID)
    public string ContentType { get; set; }   // 内容类型标识(用于区分消息类型)
    public byte[] Content { get; set; }       // JSON 序列化的消息内容
}

消息类型由 ContentType 决定:

ContentType含义对应类型
rpc/invokeRPC 调用请求RemoteInvokeMessage
rpc/resultRPC 调用响应RemoteResultMessage

框架通过 TransportMessageExtensions.GetContent<T>() 将 Content 字段反序列化为对应的消息类型。


ServerHandler — DotNetty 消息分发

ServerHandler 是 DotNetty Pipeline 中的业务 ChannelHandler,负责将解码后的 TransportMessage 转交给框架的消息监听器:

public class ServerHandler : ChannelHandlerAdapter
{
    private readonly Func<IChannelHandlerContext, TransportMessage, Task> _readMessageAction;

    public override async void ChannelRead(IChannelHandlerContext context, object message)
    {
        var transportMessage = (TransportMessage)message;
        // 将消息转交给 MessageListenerBase.OnReceived()
        await _readMessageAction(context, transportMessage);
    }
}

MessageListenerBase.OnReceived() 触发 Received 事件,通知所有已注册的处理器(IServerMessageReceivedHandler)。


DefaultServerMessageReceivedHandler — 核心处理逻辑

DefaultServerMessageReceivedHandler 是 RPC 服务端最核心的处理组件,负责将 RPC 请求消息转换为实际的业务方法调用:

public async Task<RemoteResultMessage> Handle(RemoteInvokeMessage message, Context context,
    CancellationToken cancellationToken)
{
    // 1. 根据 ServiceEntryId 查找本地服务条目
    var serviceEntry = _serviceEntryLocator.GetLocalServiceEntryById(message.ServiceEntryId);

    if (serviceEntry == null)
    {
        // 本地不存在该服务条目,返回错误
        return new RemoteResultMessage()
        {
            StatusCode = StatusCode.NotFindLocalServiceEntry,
            ExceptionMessage = $"Failed to get local service entry through serviceEntryId {message.ServiceEntryId}"
        };
    }

    // 2. 监控并发数(如果启用了 Monitor)
    if (rpcOption.EnableMonitor)
    {
        var handleInfo = await serverHandleMonitor.GetServerInstanceHandleInfo();
        if (handleInfo.ConcurrentCount > handleInfo.AllowMaxConcurrentCount)
        {
            throw new OverflowMaxServerHandleException("Exceeds the maximum allowable processing concurrency");
        }
    }

    // 3. 解析参数
    var parameterResolver = EngineContext.Current.ResolveNamed<IParameterResolver>(
        message.ParameterType.ToString());
    var parameters = parameterResolver.Parser(serviceEntry, message);

    // 4. 执行业务方法(通过本地执行器)
    var result = await serviceEntry.Executor(_serviceKeyExecutor.ServiceKey, parameters);

    // 5. 构建响应消息
    var remoteResultMessage = new RemoteResultMessage()
    {
        ServiceEntryId = serviceEntry.Id,
        Result = result,
        StatusCode = StatusCode.Success
    };

    return remoteResultMessage;
}

步骤 1:定位服务条目

IServiceEntryLocator.GetLocalServiceEntryById() 从本地服务条目管理器(IServiceEntryManager)中查找对应的 ServiceEntry。

// 服务条目 ID = 接口完全限定名 + 方法名 + 参数类型 + HTTP 方法
// 示例: Silky.Sample.IOrderAppService.GetByIdAsync_id_Get

若本地不存在该服务条目(例如消息路由到了错误的实例),返回 StatusCode.NotFindLocalServiceEntry 错误。

步骤 2:并发数保护

当 EnableMonitor = true 时,ServerHandleMonitor 会追踪当前实例正在处理的并发请求数。若超过 GovernanceOptions.MaxConcurrentHandlingCount,则抛出 OverflowMaxServerHandleException,触发客户端的 Polly 故障转移策略(OverflowServerHandleFailoverPolicyProvider),客户端会将请求转发到另一个可用实例。

步骤 3:参数解析

RPC 消息的参数格式取决于调用来源:

ParameterType场景解析方式
Rpc内部 RPC 调用(ServiceEntry.Executor)按参数位置顺序解析 Parameters[]
DictIInvokeTemplate 字典方式按参数名解析 DictParameters
HttpHTTP 请求进入服务端按 ParameterFrom 解析 HttpParameters

IParameterResolver 按 ParameterType.ToString() 命名注册,框架通过 ResolveNamed<IParameterResolver>() 动态获取对应的解析实现。

步骤 4:执行业务方法

直接调用 serviceEntry.Executor(serviceKey, parameters),走本地执行路径(LocalInvoker + 服务端过滤器管道 + 业务方法)。详见 本地执行器与服务端过滤器。

步骤 5:异常处理与响应

catch (ValidationException validationException)
{
    // 参数验证异常:返回验证错误列表,不记录异常日志
    remoteResultMessage.ValidateErrors = validationException.ValidationErrors.ToArray();
    remoteResultMessage.StatusCode = StatusCode.ValidateError;
}
catch (Exception ex) when (ex.IsFriendlyException())
{
    // 业务友好异常(UserFriendlyException):仅记录 Warning,不记录堆栈
    remoteResultMessage.ExceptionMessage = ex.Message;
    remoteResultMessage.StatusCode = ex.GetExceptionStatusCode();
}
catch (Exception ex)
{
    // 未知异常:记录完整异常堆栈,抛出让 Polly 策略捕获(可能触发熔断)
    Logger.LogException(ex);
    context[PollyContextNames.Exception] = ex;
    throw;
}

异常分类策略:

  • 业务异常(IsFriendlyException() = true)和验证异常:直接序列化到 RemoteResultMessage 中返回,不触发 Polly 熔断
  • 未知基础设施异常:重新抛出,触发 Polly 熔断计数和 Fallback 执行

RemoteResultMessage — 响应消息结构

public class RemoteResultMessage : IRemoteMessage
{
    public string ServiceEntryId { get; set; }   // 对应的服务条目 ID
    public StatusCode StatusCode { get; set; }   // 状态码
    public string ExceptionMessage { get; set; } // 异常信息(业务异常时非空)
    public ValidateError[] ValidateErrors { get; set; } // 验证错误列表
    public object Result { get; set; }           // 业务方法返回值(成功时)
    public bool IsFile { get; set; }             // 是否为文件内容
}

常见 StatusCode 枚举值:

状态码含义
Success执行成功
NotFindLocalServiceEntry本地不存在该服务条目
ValidateError参数验证失败
BusinessErrorUserFriendlyException 业务异常
UnAuthentication未通过身份认证
UnAuthorization无权限
NotFindService未找到服务
CommunicationError通信异常

Polly 策略 — 服务端治理

服务端同样具有 Polly 策略保护层(IHandlePolicyBuilder),在 DefaultServerMessageReceivedHandler 之上包裹:

// 服务端 Polly 策略组合(与客户端结构对称)
IAsyncPolicy<RemoteResultMessage> policy = Policy.NoOpAsync<RemoteResultMessage>();

// 1. 结果策略(如服务端超时)
policy = policy.WrapAsync(handlePolicyWithResultProvider.Create(serviceEntryId));

// 2. 普通策略
policy = policy.WrapAsync(handlePolicyProvider.Create(serviceEntryId));

// 3. 熔断策略(服务端自保护)
policy = policy.WrapAsync(circuitBreakerPolicyProvider.Create(serviceEntryId));

// 4. Fallback 策略(含服务端降级)
policy = policy.WrapAsync(serverHandleFallbackPolicyProvider.Create(message));

服务端 Polly 策略的主要作用:

  • 当异常发生 N 次后,触发服务端熔断,临时拒绝该服务条目的请求(防止雪崩)
  • 通过 [Fallback] 特性配置的服务端降级方法(在熔断时返回缓存结果或默认值)

RpcContext — 调用链上下文

RpcContext 是基于 AsyncLocal<T> 的线程安全上下文容器,在整个 RPC 调用链中保存调用相关的元数据:

// 服务端通过 Codec 从 TransportMessage 还原 RpcContext
RpcContext.Context.SetMessageId(message.Id);
RpcContext.Context.SetAttachments(remoteInvokeMessage.Attachments);
RpcContext.Context.SetTransAttachments(remoteInvokeMessage.TransAttachments);

// 业务代码中可通过 RpcContext 访问:
var currentUserId = RpcContext.Context.GetUserId();
var currentTenantId = RpcContext.Context.GetTenantId();
var traceId = RpcContext.Context.GetTraceId();

附件透传机制(Attachments):

  • 客户端在 RemoteInvokeMessage.Attachments 中写入当前用户、租户、语言、TraceId 等信息
  • 服务端收到消息后,将 Attachments 还原到 RpcContext
  • 服务端再次发起下游 RPC 调用时,框架自动将 RpcContext 中的附件透传到 TransAttachments
  • 这样用户身份和调用链信息在整个微服务调用链中隐式透传,无需在方法签名中显式传递

RPC 服务端完整流水线图

客户端发出 TCP 数据包
    │
    ▼
DotNetty ServerBootstrap
    │  BossGroup: Accept 新连接
    │  WorkerGroup: 处理 I/O 读写
    ▼
Channel Pipeline(每个连接独立的处理链)
    │
    ├── TlsHandler(SSL,可选)
    │
    ├── IdleStateHandler(心跳超时检测)
    │
    ├── LengthFieldBasedFrameDecoder(解决粘包/拆包)
    │
    ├── TransportMessageDecoder(JSON 反序列化 → TransportMessage)
    │
    ├── TransportMessageEncoder(编码响应)
    │
    └── ServerHandler(ChannelRead 触发)
            │
            ▼
        MessageListenerBase.OnReceived()
            │  触发 Received 事件
            ▼
        RpcContext 初始化(还原 Attachments)
            │
            ▼
        Polly 策略保护层(IHandlePolicyBuilder)
            │
            ▼
        DefaultServerMessageReceivedHandler.Handle()
            │  1. 查找 ServiceEntry(按 ServiceEntryId)
            │  2. 并发数保护(可选)
            │  3. 解析参数(IParameterResolver)
            │  4. serviceEntry.Executor() → LocalInvoker
            │  5. 构建 RemoteResultMessage
            │
            ▼
        TransportMessageEncoder 编码
            │
            ▼
        DotNetty Channel 写回客户端
编辑当前页
Prev
远程执行器与 RPC 调用
Next
服务治理