概述
当一个服务条目被判定为远程执行(IsLocal = false),或通过 IInvokeTemplate 直接发起跨服务调用时,框架通过 远程执行器(IRemoteExecutor) 完成整个 RPC 调用链。
远程执行的核心流程如下:
构建 RemoteInvokeMessage
│
▼
组合 Polly 治理策略(超时 + 熔断 + Fallback)
│
▼
在策略保护下执行 IRemoteCaller.InvokeAsync()
│
├── 查找目标服务的所有健康端点
├── 按负载均衡策略选定一个端点
├── 获取(或复用)DotNetty TransportClient
├── 运行客户端过滤器管道
└── 发送 RPC 消息,等待响应
│
▼
解析响应,返回结果
DefaultRemoteExecutor — 远程执行入口
internal class DefaultRemoteExecutor : IRemoteExecutor
{
private readonly IRemoteCaller _remoteCaller;
private readonly IInvokePolicyBuilder _invokePolicyBuilder;
public async Task<object> Execute(ServiceEntry serviceEntry, object[] parameters, string serviceKey = null)
{
// 1. 构建 RPC 调用消息
var remoteInvokeMessage = new RemoteInvokeMessage()
{
ServiceEntryId = serviceEntry.ServiceEntryDescriptor.Id,
ServiceId = serviceEntry.ServiceId,
Parameters = _fileParameterConverter.Convert(parameters),
ParameterType = ParameterType.Rpc,
};
// 2. 一致性哈希路由时确定 hashKey
string hashKey = null;
if (serviceEntry.GovernanceOptions.ShuntStrategy == ShuntStrategy.HashAlgorithm)
{
hashKey = GetHashKeyValue();
}
// 3. 构建 Polly 治理策略
var policy = _invokePolicyBuilder.Build(serviceEntry.Id, parameters);
// 4. 在策略保护下执行远程调用
var result = await policy.ExecuteAsync(async () =>
await _remoteCaller.InvokeAsync(remoteInvokeMessage,
serviceEntry.GovernanceOptions.ShuntStrategy, hashKey));
return result;
}
}
RemoteInvokeMessage — RPC 调用消息结构
每次远程调用会构建一个 RemoteInvokeMessage,这是在网络上传输的核心载体:
| 字段 | 类型 | 说明 |
|---|---|---|
ServiceEntryId | string | 服务条目 Id(服务端据此定位方法) |
ServiceId | string | 服务 Id |
Parameters | object[] | 位置参数数组(Rpc 参数类型时使用) |
DictParameters | IDictionary<string, object> | 字典参数(字典参数类型时使用) |
HttpParameters | IDictionary<ParameterFrom, object> | HTTP 参数(来自 HTTP 请求时使用) |
ParameterType | ParameterType | 参数来源类型(Rpc / Dict / Http) |
Attachments | IDictionary<string, string> | 附件(用户Id、语言、TraceId 等上下文信息) |
TransAttachments | IDictionary<string, string> | 透传附件(跨服务调用链路透传) |
消息通过 TransportMessage 外层封装后由 DotNetty Codec 编解码(JSON 格式)传输。
IInvokePolicyBuilder — Polly 策略组合
远程调用在执行前,会先通过 IInvokePolicyBuilder.Build() 组合出一个 Polly 异步策略,该策略包含多层:
IAsyncPolicy<object?> policy = Policy.NoOpAsync<object?>();
// Layer 1: 结果策略(如超时)
foreach (var p in _policyWithResultProviders)
policy = policy.WrapAsync(p.Create(serviceEntryId));
// Layer 2: 普通策略(如重试)
foreach (var p in _policyProviders)
policy = policy.WrapAsync(p.Create(serviceEntryId));
// Layer 3: 熔断策略
foreach (var p in _circuitBreakerPolicyProviders)
policy = policy.WrapAsync(p.Create(serviceEntryId));
// Layer 4(运行时注入): Fallback 策略(含业务 Fallback 方法)
foreach (var p in _invokeFallbackPolicyProviders)
policy = policy.WrapAsync(p.Create(serviceEntryId, parameters));
策略缓存以 serviceEntryId 为 key(除 Fallback 策略外),避免重复构建开销。
各策略层职责:
| 层 | 策略类型 | 配置来源 | 说明 |
|---|---|---|---|
| 超时 | TimeoutPolicy | GovernanceOptions.TimeoutMillSeconds | 限制单次 RPC 调用总时长 |
| 重试 | RetryPolicy | GovernanceOptions.MaxReTry | 发生非业务异常时自动重试 |
| 熔断 | CircuitBreakerPolicy | GovernanceOptions.ExceptionsAllowedBeforeBreaking | N 次失败后打开熔断器 |
| Fallback | FallbackPolicy | [Fallback] 特性 | 熔断或最终失败后调用降级方法 |
详见 服务治理。
DefaultRemoteCaller — RPC 执行核心
DefaultRemoteCaller 是 RPC 调用的实际执行者,承担端点选择、监控埋点、DotNetty 客户端管理等职责:
public async Task<object?> InvokeAsync(RemoteInvokeMessage remoteInvokeMessage,
ShuntStrategy shuntStrategy, string? hashKey = null)
{
// 1. 从 ServerManager 中查找目标服务所有健康的 RPC 端点
var rpcEndpoints = FindRpcEndpoint(remoteInvokeMessage);
// 2. 按负载均衡策略选定一个端点
var selectedRpcEndpoint = SelectedRpcEndpoint(
rpcEndpoints, shuntStrategy, remoteInvokeMessage.ServiceEntryId, hashKey,
out var confirmedShuntStrategy);
// 3. 将选定端点信息写入 RpcContext(用于链路追踪和日志)
RpcContext.Context.SetRcpInvokeAddressInfo(selectedRpcEndpoint.Descriptor);
// 4. 监控埋点(如果启用了 Monitor)
tracingTimestamp = _clientInvokeDiagnosticListener.TracingBefore(remoteInvokeMessage, messageId);
clientInvokeInfo = invokeMonitor?.Monitor((remoteInvokeMessage.ServiceEntryId, selectedRpcEndpoint));
// 5. 获取(或复用)DotNetty TransportClient
var client = await _transportClientFactory.GetClient(selectedRpcEndpoint);
// 6. 构建 RemoteInvoker(含客户端过滤器链)并执行
remoteInvoker = _clientRemoteInvokerFactory.CreateInvoker(
new ClientInvokeContext(remoteInvokeMessage, confirmedShuntStrategy, hashKey),
client, messageId);
await remoteInvoker.InvokeAsync();
}
步骤详解
1. 查找健康端点(FindRpcEndpoint)
private IReadOnlyCollection<ISilkyEndpoint> FindRpcEndpoint(RemoteInvokeMessage message)
{
// 从 ServerManager 获取目标服务的所有在线实例端点
var serviceDescriptor = _serverManager.GetServiceDescriptor(message.ServiceId);
var rpcEndpoints = _serverManager.GetRpcEndpoints(serviceDescriptor,
ServiceProtocol.Rpc);
if (!rpcEndpoints.Any())
throw new NotFindServiceEndPointException($"No available endpoint for {message.ServiceEntryId}");
return rpcEndpoints;
}
ServerManager 订阅了注册中心的变更事件,始终维护最新的服务实例列表;端点健康状态(熔断、心跳超时)也在此过滤。
2. 负载均衡端点选择(SelectedRpcEndpoint)
private ISilkyEndpoint SelectedRpcEndpoint(
IReadOnlyCollection<ISilkyEndpoint> endpoints,
ShuntStrategy shuntStrategy,
string serviceEntryId,
string? hashKey,
out ShuntStrategy confirmedShuntStrategy)
{
var selector = _endpointSelectors[shuntStrategy]; // 按分流策略获取选择器
return selector.Select(new RpcEndpointSelectContext(endpoints, serviceEntryId, hashKey),
out confirmedShuntStrategy);
}
3. 获取 TransportClient(连接复用)
// ITransportClientFactory 维护按端点地址的连接池
var client = await _transportClientFactory.GetClient(selectedRpcEndpoint);
DotNetty 连接池由 TransportClientPoolNumber(默认 50)控制,连接复用减少 TCP 握手开销。详见 RPC 配置。
负载均衡策略(ShuntStrategy)
ShuntStrategy 枚举定义了四种端点选择策略:
| 策略 | 枚举值 | 实现类 | 说明 |
|---|---|---|---|
| 轮询 | Polling | PollingRpcEndpointSelector | 按顺序依次选择,默认策略 |
| 随机 | Random | RandomRpcEndpointSelector | 随机选择一个健康端点 |
| 一致性哈希 | HashAlgorithm | HashAlgorithmRpcEndpointSelector | 基于 hashKey 将同一用户/数据路由到同一实例 |
| 指定地址 | Appoint | AppointRpcEndpointSelector | 由 IAppointAddressInvoker 显式指定目标地址 |
一致性哈希路由示例:
// 在服务接口上标注 HashShuntStrategy,指定哈希 key 的来源参数
[ServiceRoute]
public interface IOrderAppService
{
[HashShuntStrategy("userId")] // 以 userId 参数作为哈希 key
Task<OrderDto> GetByUserIdAsync(long userId);
}
这样同一 userId 的请求始终路由到同一服务实例,适合有本地缓存的场景。
客户端过滤器管道(Client Filter Pipeline)
客户端过滤器与服务端过滤器对称,在 RPC 消息发送前后执行:
过滤器类型
| 类型 | 接口 | 执行时机 |
|---|---|---|
| 客户端身份认证 | IAsyncClientAuthorizationFilter | 发送请求前最先执行 |
| 客户端 Action 过滤器 | IAsyncClientFilter | 包裹 RPC 调用(before + after) |
| 客户端异常过滤器 | IAsyncClientExceptionFilter | RPC 调用抛出异常时 |
| 客户端结果过滤器 | IAsyncClientResultFilter | 收到响应后 |
| 始终运行结果过滤器 | IAsyncAlwaysRunClientResultFilter | 无论是否短路都执行 |
注册方式
// 全局注册客户端过滤器
services.AddClientFilter<MyClientFilter>();
// 或在接口方法上标注(方法级)
[MyClientFilter]
Task<OrderDto> GetByIdAsync(long id);
RPC 附件透传(Attachments)
客户端过滤器的一个重要用途是附件透传。silky 框架内置了附件传播过滤器,在每次 RPC 调用前将 RpcContext 中的当前用户、语言、TraceId 等信息写入 RemoteInvokeMessage.Attachments,服务端收到后再反向填充到 RpcContext,实现用户身份和调用链信息在微服务间的隐式传递。
RemoteInvoker — 客户端执行状态机
RemoteInvoker 采用与 LocalInvoker 相同的状态机模式驱动客户端过滤器管道:
State.ActionBegin
│
├── [Client Auth Filter] OnClientAuthorizationAsync
│
├── [Client Filter 1] OnActionExecutingAsync ─────────┐
├── [Client Filter N] OnActionExecutingAsync ─────┐ │
│ │ │
│ ITransportClient.SendAndReceiveAsync() │ │
│ (发送 RPC 消息,等待响应) │ │
│ │ │
├── [Client Filter N] OnActionExecutedAsync ──────┘ │
├── [Client Filter 1] OnActionExecutedAsync ──────────┘
│
├── [Client Result Filter] OnResultExecutingAsync
│ ......
├── [Client Result Filter] OnResultExecutedAsync
│
▼
收到 RemoteResultMessage,解析返回值
消息的发送与接收(TransportClient)
实际的 TCP 消息收发由 DefaultTransportClient 负责:
public async Task<RemoteResultMessage> SendAndReceiveAsync(
TransportMessage message, int timeoutMillSeconds)
{
// 1. 注册响应等待(以 message.Id 为 key)
var tcs = new TaskCompletionSource<TransportMessage>();
m_resultDictionary[message.Id] = tcs;
// 2. 检查提前到达缓冲区(处理响应先于请求注册的极端情况)
if (_earlyMessageBuffer.TryRemove(message.Id, out var early))
{
tcs.TrySetResult(early.Message);
}
else
{
// 3. 通过 DotNetty Channel 发送消息
await MessageSender.SendMessageAsync(message);
}
// 4. 带超时等待响应
using var cts = new CancellationTokenSource(timeoutMillSeconds);
cts.Token.Register(() => tcs.TrySetCanceled());
var response = await tcs.Task;
// 5. 解析为 RemoteResultMessage
return response.GetContent<RemoteResultMessage>();
}
响应关联机制:
- 每条
TransportMessage都有一个全局唯一的Id(UUID) - 客户端发送消息时,将
tcs(TaskCompletionSource)以Id为 key 注册到m_resultDictionary - 服务端响应时回传相同的
Id - 客户端 Listener 收到响应后从字典中找到对应的
tcs并SetResult(),唤醒等待协程 - 若 1 分钟内未找到等待者的提前到达消息,由清理定时器(每分钟执行)从
_earlyMessageBuffer中移除
完整 RPC 调用链路图
调用方(消费者端)
│
▼
DefaultRemoteExecutor
│ 构建 RemoteInvokeMessage + 组合 Polly 策略
│
▼
policy.ExecuteAsync()(策略保护层:超时/熔断/Fallback)
│
▼
DefaultRemoteCaller.InvokeAsync()
│ FindRpcEndpoint → SelectedRpcEndpoint(负载均衡)
│ GetTransportClient(连接池复用)
│
▼
RemoteInvoker(客户端过滤器管道)
│ [ClientFilter1] OnExecuting → [ClientFilterN] OnExecuting
│
▼
DefaultTransportClient.SendAndReceiveAsync()
│ 序列化 TransportMessage(JSON)
│ DotNetty Channel 发送 TCP 数据包
│
▼ ← 网络传输 →
│
▼
服务端 DotNetty Channel 接收
│ 解码 + DefaultServerMessageReceivedHandler
│ 执行本地业务(LocalInvoker)
│ 构建 RemoteResultMessage
│ 编码 + 写回 Channel
│
▼ ← 网络传输 →
│
▼
DefaultTransportClient 收到响应
│ tcs.SetResult() 唤醒等待协程
│
▼
RemoteInvoker(客户端过滤器管道继续)
│ [ClientFilterN] OnExecuted → [ClientFilter1] OnExecuted
│
▼
解析返回值,返回给调用方
