Silky微服务框架在线文档Silky微服务框架在线文档
首页
文档
配置
源码解析
博文
github
gitee
首页
文档
配置
源码解析
博文
github
gitee
  • 启动时

    • 主机的构建
    • 服务引擎
    • 模块
    • 服务与服务条目的解析
    • Silky服务主机
    • 依赖注入约定
    • RPC 服务代理
  • 运行时

    • 终结点与路由
    • 执行器与调度
    • 本地执行器与服务端过滤器
    • 远程执行器与 RPC 调用
    • RPC 服务端消息处理
    • 服务治理
    • 缓存拦截器
    • 分布式事务(TCC)
    • HTTP 网关请求管道
    • 过滤器执行管道
    • Polly 弹性策略管道
    • 端点健康监控

概述

当一个服务条目被判定为远程执行(IsLocal = false),或通过 IInvokeTemplate 直接发起跨服务调用时,框架通过 远程执行器(IRemoteExecutor) 完成整个 RPC 调用链。

远程执行的核心流程如下:

构建 RemoteInvokeMessage
    │
    ▼
组合 Polly 治理策略(超时 + 熔断 + Fallback)
    │
    ▼
在策略保护下执行 IRemoteCaller.InvokeAsync()
    │
    ├── 查找目标服务的所有健康端点
    ├── 按负载均衡策略选定一个端点
    ├── 获取(或复用)DotNetty TransportClient
    ├── 运行客户端过滤器管道
    └── 发送 RPC 消息,等待响应
    │
    ▼
解析响应,返回结果

DefaultRemoteExecutor — 远程执行入口

internal class DefaultRemoteExecutor : IRemoteExecutor
{
    private readonly IRemoteCaller _remoteCaller;
    private readonly IInvokePolicyBuilder _invokePolicyBuilder;

    public async Task<object> Execute(ServiceEntry serviceEntry, object[] parameters, string serviceKey = null)
    {
        // 1. 构建 RPC 调用消息
        var remoteInvokeMessage = new RemoteInvokeMessage()
        {
            ServiceEntryId = serviceEntry.ServiceEntryDescriptor.Id,
            ServiceId = serviceEntry.ServiceId,
            Parameters = _fileParameterConverter.Convert(parameters),
            ParameterType = ParameterType.Rpc,
        };

        // 2. 一致性哈希路由时确定 hashKey
        string hashKey = null;
        if (serviceEntry.GovernanceOptions.ShuntStrategy == ShuntStrategy.HashAlgorithm)
        {
            hashKey = GetHashKeyValue();
        }

        // 3. 构建 Polly 治理策略
        var policy = _invokePolicyBuilder.Build(serviceEntry.Id, parameters);

        // 4. 在策略保护下执行远程调用
        var result = await policy.ExecuteAsync(async () =>
            await _remoteCaller.InvokeAsync(remoteInvokeMessage,
                serviceEntry.GovernanceOptions.ShuntStrategy, hashKey));

        return result;
    }
}

RemoteInvokeMessage — RPC 调用消息结构

每次远程调用会构建一个 RemoteInvokeMessage,这是在网络上传输的核心载体:

字段类型说明
ServiceEntryIdstring服务条目 Id(服务端据此定位方法)
ServiceIdstring服务 Id
Parametersobject[]位置参数数组(Rpc 参数类型时使用)
DictParametersIDictionary<string, object>字典参数(字典参数类型时使用)
HttpParametersIDictionary<ParameterFrom, object>HTTP 参数(来自 HTTP 请求时使用)
ParameterTypeParameterType参数来源类型(Rpc / Dict / Http)
AttachmentsIDictionary<string, string>附件(用户Id、语言、TraceId 等上下文信息)
TransAttachmentsIDictionary<string, string>透传附件(跨服务调用链路透传)

消息通过 TransportMessage 外层封装后由 DotNetty Codec 编解码(JSON 格式)传输。


IInvokePolicyBuilder — Polly 策略组合

远程调用在执行前,会先通过 IInvokePolicyBuilder.Build() 组合出一个 Polly 异步策略,该策略包含多层:

IAsyncPolicy<object?> policy = Policy.NoOpAsync<object?>();

// Layer 1: 结果策略(如超时)
foreach (var p in _policyWithResultProviders)
    policy = policy.WrapAsync(p.Create(serviceEntryId));

// Layer 2: 普通策略(如重试)
foreach (var p in _policyProviders)
    policy = policy.WrapAsync(p.Create(serviceEntryId));

// Layer 3: 熔断策略
foreach (var p in _circuitBreakerPolicyProviders)
    policy = policy.WrapAsync(p.Create(serviceEntryId));

// Layer 4(运行时注入): Fallback 策略(含业务 Fallback 方法)
foreach (var p in _invokeFallbackPolicyProviders)
    policy = policy.WrapAsync(p.Create(serviceEntryId, parameters));

策略缓存以 serviceEntryId 为 key(除 Fallback 策略外),避免重复构建开销。

各策略层职责:

层策略类型配置来源说明
超时TimeoutPolicyGovernanceOptions.TimeoutMillSeconds限制单次 RPC 调用总时长
重试RetryPolicyGovernanceOptions.MaxReTry发生非业务异常时自动重试
熔断CircuitBreakerPolicyGovernanceOptions.ExceptionsAllowedBeforeBreakingN 次失败后打开熔断器
FallbackFallbackPolicy[Fallback] 特性熔断或最终失败后调用降级方法

详见 服务治理。


DefaultRemoteCaller — RPC 执行核心

DefaultRemoteCaller 是 RPC 调用的实际执行者,承担端点选择、监控埋点、DotNetty 客户端管理等职责:

public async Task<object?> InvokeAsync(RemoteInvokeMessage remoteInvokeMessage,
    ShuntStrategy shuntStrategy, string? hashKey = null)
{
    // 1. 从 ServerManager 中查找目标服务所有健康的 RPC 端点
    var rpcEndpoints = FindRpcEndpoint(remoteInvokeMessage);

    // 2. 按负载均衡策略选定一个端点
    var selectedRpcEndpoint = SelectedRpcEndpoint(
        rpcEndpoints, shuntStrategy, remoteInvokeMessage.ServiceEntryId, hashKey,
        out var confirmedShuntStrategy);

    // 3. 将选定端点信息写入 RpcContext(用于链路追踪和日志)
    RpcContext.Context.SetRcpInvokeAddressInfo(selectedRpcEndpoint.Descriptor);

    // 4. 监控埋点(如果启用了 Monitor)
    tracingTimestamp = _clientInvokeDiagnosticListener.TracingBefore(remoteInvokeMessage, messageId);
    clientInvokeInfo = invokeMonitor?.Monitor((remoteInvokeMessage.ServiceEntryId, selectedRpcEndpoint));

    // 5. 获取(或复用)DotNetty TransportClient
    var client = await _transportClientFactory.GetClient(selectedRpcEndpoint);

    // 6. 构建 RemoteInvoker(含客户端过滤器链)并执行
    remoteInvoker = _clientRemoteInvokerFactory.CreateInvoker(
        new ClientInvokeContext(remoteInvokeMessage, confirmedShuntStrategy, hashKey),
        client, messageId);
    await remoteInvoker.InvokeAsync();
}

步骤详解

1. 查找健康端点(FindRpcEndpoint)

private IReadOnlyCollection<ISilkyEndpoint> FindRpcEndpoint(RemoteInvokeMessage message)
{
    // 从 ServerManager 获取目标服务的所有在线实例端点
    var serviceDescriptor = _serverManager.GetServiceDescriptor(message.ServiceId);
    var rpcEndpoints = _serverManager.GetRpcEndpoints(serviceDescriptor,
        ServiceProtocol.Rpc);
    
    if (!rpcEndpoints.Any())
        throw new NotFindServiceEndPointException($"No available endpoint for {message.ServiceEntryId}");
    
    return rpcEndpoints;
}

ServerManager 订阅了注册中心的变更事件,始终维护最新的服务实例列表;端点健康状态(熔断、心跳超时)也在此过滤。

2. 负载均衡端点选择(SelectedRpcEndpoint)

private ISilkyEndpoint SelectedRpcEndpoint(
    IReadOnlyCollection<ISilkyEndpoint> endpoints,
    ShuntStrategy shuntStrategy,
    string serviceEntryId,
    string? hashKey,
    out ShuntStrategy confirmedShuntStrategy)
{
    var selector = _endpointSelectors[shuntStrategy]; // 按分流策略获取选择器
    return selector.Select(new RpcEndpointSelectContext(endpoints, serviceEntryId, hashKey),
        out confirmedShuntStrategy);
}

3. 获取 TransportClient(连接复用)

// ITransportClientFactory 维护按端点地址的连接池
var client = await _transportClientFactory.GetClient(selectedRpcEndpoint);

DotNetty 连接池由 TransportClientPoolNumber(默认 50)控制,连接复用减少 TCP 握手开销。详见 RPC 配置。


负载均衡策略(ShuntStrategy)

ShuntStrategy 枚举定义了四种端点选择策略:

策略枚举值实现类说明
轮询PollingPollingRpcEndpointSelector按顺序依次选择,默认策略
随机RandomRandomRpcEndpointSelector随机选择一个健康端点
一致性哈希HashAlgorithmHashAlgorithmRpcEndpointSelector基于 hashKey 将同一用户/数据路由到同一实例
指定地址AppointAppointRpcEndpointSelector由 IAppointAddressInvoker 显式指定目标地址

一致性哈希路由示例:

// 在服务接口上标注 HashShuntStrategy,指定哈希 key 的来源参数
[ServiceRoute]
public interface IOrderAppService
{
    [HashShuntStrategy("userId")]  // 以 userId 参数作为哈希 key
    Task<OrderDto> GetByUserIdAsync(long userId);
}

这样同一 userId 的请求始终路由到同一服务实例,适合有本地缓存的场景。


客户端过滤器管道(Client Filter Pipeline)

客户端过滤器与服务端过滤器对称,在 RPC 消息发送前后执行:

过滤器类型

类型接口执行时机
客户端身份认证IAsyncClientAuthorizationFilter发送请求前最先执行
客户端 Action 过滤器IAsyncClientFilter包裹 RPC 调用(before + after)
客户端异常过滤器IAsyncClientExceptionFilterRPC 调用抛出异常时
客户端结果过滤器IAsyncClientResultFilter收到响应后
始终运行结果过滤器IAsyncAlwaysRunClientResultFilter无论是否短路都执行

注册方式

// 全局注册客户端过滤器
services.AddClientFilter<MyClientFilter>();

// 或在接口方法上标注(方法级)
[MyClientFilter]
Task<OrderDto> GetByIdAsync(long id);

RPC 附件透传(Attachments)

客户端过滤器的一个重要用途是附件透传。silky 框架内置了附件传播过滤器,在每次 RPC 调用前将 RpcContext 中的当前用户、语言、TraceId 等信息写入 RemoteInvokeMessage.Attachments,服务端收到后再反向填充到 RpcContext,实现用户身份和调用链信息在微服务间的隐式传递。


RemoteInvoker — 客户端执行状态机

RemoteInvoker 采用与 LocalInvoker 相同的状态机模式驱动客户端过滤器管道:

State.ActionBegin
    │
    ├── [Client Auth Filter] OnClientAuthorizationAsync
    │
    ├── [Client Filter 1] OnActionExecutingAsync ─────────┐
    ├── [Client Filter N] OnActionExecutingAsync ─────┐   │
    │                                                 │   │
    │   ITransportClient.SendAndReceiveAsync()        │   │
    │   (发送 RPC 消息,等待响应)                     │   │
    │                                                 │   │
    ├── [Client Filter N] OnActionExecutedAsync ──────┘   │
    ├── [Client Filter 1] OnActionExecutedAsync ──────────┘
    │
    ├── [Client Result Filter] OnResultExecutingAsync
    │   ......
    ├── [Client Result Filter] OnResultExecutedAsync
    │
    ▼
收到 RemoteResultMessage,解析返回值

消息的发送与接收(TransportClient)

实际的 TCP 消息收发由 DefaultTransportClient 负责:

public async Task<RemoteResultMessage> SendAndReceiveAsync(
    TransportMessage message, int timeoutMillSeconds)
{
    // 1. 注册响应等待(以 message.Id 为 key)
    var tcs = new TaskCompletionSource<TransportMessage>();
    m_resultDictionary[message.Id] = tcs;

    // 2. 检查提前到达缓冲区(处理响应先于请求注册的极端情况)
    if (_earlyMessageBuffer.TryRemove(message.Id, out var early))
    {
        tcs.TrySetResult(early.Message);
    }
    else
    {
        // 3. 通过 DotNetty Channel 发送消息
        await MessageSender.SendMessageAsync(message);
    }

    // 4. 带超时等待响应
    using var cts = new CancellationTokenSource(timeoutMillSeconds);
    cts.Token.Register(() => tcs.TrySetCanceled());
    var response = await tcs.Task;

    // 5. 解析为 RemoteResultMessage
    return response.GetContent<RemoteResultMessage>();
}

响应关联机制:

  • 每条 TransportMessage 都有一个全局唯一的 Id(UUID)
  • 客户端发送消息时,将 tcs(TaskCompletionSource)以 Id 为 key 注册到 m_resultDictionary
  • 服务端响应时回传相同的 Id
  • 客户端 Listener 收到响应后从字典中找到对应的 tcs 并 SetResult(),唤醒等待协程
  • 若 1 分钟内未找到等待者的提前到达消息,由清理定时器(每分钟执行)从 _earlyMessageBuffer 中移除

完整 RPC 调用链路图

调用方(消费者端)
    │
    ▼
DefaultRemoteExecutor
    │  构建 RemoteInvokeMessage + 组合 Polly 策略
    │
    ▼
policy.ExecuteAsync()(策略保护层:超时/熔断/Fallback)
    │
    ▼
DefaultRemoteCaller.InvokeAsync()
    │  FindRpcEndpoint → SelectedRpcEndpoint(负载均衡)
    │  GetTransportClient(连接池复用)
    │
    ▼
RemoteInvoker(客户端过滤器管道)
    │  [ClientFilter1] OnExecuting → [ClientFilterN] OnExecuting
    │
    ▼
DefaultTransportClient.SendAndReceiveAsync()
    │  序列化 TransportMessage(JSON)
    │  DotNetty Channel 发送 TCP 数据包
    │
    ▼  ← 网络传输 →
    │
    ▼
服务端 DotNetty Channel 接收
    │  解码 + DefaultServerMessageReceivedHandler
    │  执行本地业务(LocalInvoker)
    │  构建 RemoteResultMessage
    │  编码 + 写回 Channel
    │
    ▼  ← 网络传输 →
    │
    ▼
DefaultTransportClient 收到响应
    │  tcs.SetResult() 唤醒等待协程
    │
    ▼
RemoteInvoker(客户端过滤器管道继续)
    │  [ClientFilterN] OnExecuted → [ClientFilter1] OnExecuted
    │
    ▼
解析返回值,返回给调用方
编辑当前页
Prev
本地执行器与服务端过滤器
Next
RPC 服务端消息处理