概述
silky 框架通过端点健康监控机制在运行时持续追踪每个 RPC 端点(ISilkyEndpoint)的健康状态。一旦端点在通信中连续失败达到阈值,框架会自动将其从可用端点列表中移除,防止持续将请求路由到不健康的实例。
本文说明端点健康状态的运行时追踪机制:从 RPC 调用失败 → Monitor 记录 → 状态变更 → 端点移除 → 选择器感知的完整联动链路。
核心组件
| 组件 | 类型 | 职责 |
|---|---|---|
IRpcEndpointMonitor / DefaultRpcEndpointMonitor | Singleton | 维护所有端点的健康状态(IsEnable + UnHealthTimes),提供状态变更事件 |
IInvokeMonitor / DefaultInvokeMonitor | Singleton(监控启用时) | 追踪每次客户端调用的成功/失败,联动 IRpcEndpointMonitor |
IServerHandleMonitor / DefaultServerHandleMonitor | Singleton(监控启用时) | 追踪每次服务端处理的成功/失败 |
IRpcEndpointSelector(各实现) | Singleton | 订阅端点变更事件,清理失效端点缓存 |
端点健康状态数据结构
DefaultRpcEndpointMonitor 内部维护一个并发字典,以端点为键,记录其健康状态:
private ConcurrentDictionary<ISilkyEndpoint, CheckModel> m_checkEndpoints = new();
private class CheckModel
{
public bool IsEnable { get; set; } // 端点当前是否可用
public int UnHealthTimes { get; set; } // 连续不健康计数
}
Monitor(ISilkyEndpoint) 首次注册端点时将其加入字典(IsEnable = true, UnHealthTimes = 0),并触发 OnAddMonitor 事件。
联动链路:调用失败 → 端点移除
第一步:RPC 调用失败
DefaultRemoteCaller.InvokeAsync() 在 catch 块中调用 InvokeMonitor.ExecFail():
catch (Exception ex)
{
invokeMonitor?.ExecFail(
(remoteInvokeMessage.ServiceEntryId, selectedRpcEndpoint),
sp.Elapsed.TotalMilliseconds,
clientInvokeInfo);
throw;
}
第二步:InvokeMonitor 更新调用统计并变更端点状态
DefaultInvokeMonitor.ExecFail() 完成两件事:
- 更新实例级统计信息(
InstanceInvokeInfo.FaultInvokeCount++,AET 计算) - 调用
IRpcEndpointMonitor.ChangeStatus(endpoint, isEnable: false, unHealthCeilingTimes)
public void ExecFail((string, ISilkyEndpoint) item, ...)
{
// 更新实例级统计
lock (_monitorProvider.InstanceInvokeInfo)
{
_monitorProvider.InstanceInvokeInfo.ConcurrentCount--;
_monitorProvider.InstanceInvokeInfo.FaultInvokeCount++;
...
}
// 更新服务条目级调用信息
clientInvokeInfo.IsEnable = _rpcEndpointMonitor.IsEnable(item.Item2);
clientInvokeInfo.FaultInvokeCount++;
...
// 通知 RpcEndpointMonitor 端点不健康
_rpcEndpointMonitor.ChangeStatus(item.Item2, isEnable: false, unHealthCeilingTimes: ...);
}
第三步:RpcEndpointMonitor 更新状态并触发事件
DefaultRpcEndpointMonitor.ChangeStatus() 核心逻辑:
public void ChangeStatus(ISilkyEndpoint silkyEndpoint, bool isEnable, int unHealthCeilingTimes = 0)
{
if (m_checkEndpoints.TryGetValue(silkyEndpoint, out var healthCheckModel))
{
var newModel = new CheckModel(
isEnable,
isEnable ? 0 : healthCheckModel.UnHealthTimes + 1 // 失败则递增
);
m_checkEndpoints.TryUpdate(silkyEndpoint, newModel, healthCheckModel);
healthCheckModel = newModel;
}
// 超过阈值:移除端点并触发 OnRemoveRpcEndpoint 事件
if (!isEnable && healthCheckModel.UnHealthTimes >= unHealthCeilingTimes)
{
OnRemoveRpcEndpoint?.Invoke(silkyEndpoint);
m_checkEndpoints.TryRemove(silkyEndpoint, out _);
}
// 状态变化:触发 OnStatusChange 事件
if (healthCheckModel.IsEnable != isEnable)
OnStatusChange?.Invoke(silkyEndpoint, isEnable);
// 不可用:触发 OnDisEnable 事件
if (!isEnable)
OnDisEnable?.Invoke(silkyEndpoint);
}
第四步:端点选择器感知移除事件
各 IRpcEndpointSelector 实现在构造函数中订阅端点变更事件,主动清除包含该端点的缓存池:
// PollingRpcEndpointSelector 构造函数
_rpcEndpointMonitor.OnRemoveRpcEndpoint += async rpcEndpoint =>
{
var removeKeys = addressesPools
.Where(p => p.Value.Endpoints.Any(q => q.Equals(rpcEndpoint)))
.Select(p => p.Key);
foreach (var key in removeKeys)
addressesPools.TryRemove(key, out _);
};
_rpcEndpointMonitor.OnDisEnable += async rpcEndpoint =>
{
// 同样清除包含该端点的缓存,下次路由时不会选到此端点
...
};
端点被移除或标记不可用后,选择器的缓存被清除,后续调用重新从可用端点列表中选择,不会再路由到不健康的实例。
服务端处理监控
DefaultServerHandleMonitor(服务端)的机制与客户端对称,追踪当前实例的服务端处理情况:
public ServerHandleInfo? Monitor((string serviceEntryId, string clientAddress) item)
{
lock (_monitorProvider.InstanceHandleInfo)
{
_monitorProvider.InstanceHandleInfo.ConcurrentCount++;
// 更新最大并发计数、首次/末次处理时间等
}
var serverHandleInfo = _monitorProvider.GetServerHandleInfo(cacheKey);
serverHandleInfo.TotalHandleCount++;
return serverHandleInfo;
}
ExecFail() 时记录:
InstanceHandleInfo.FaultHandleCount++InstanceHandleInfo.TotalSeriousErrorCount++(严重错误时)serverHandleInfo.FaultHandleCount++
监控数据模型
ClientInvokeInfo(客户端每服务条目调用统计)
| 字段 | 说明 |
|---|---|
ServiceEntryId | 被调用的服务条目 ID |
Address | 目标端点地址 |
TotalInvokeCount | 总调用次数 |
FaultInvokeCount | 失败调用次数 |
AET | 平均执行时间(毫秒,滑动平均) |
IsEnable | 端点当前是否可用 |
FirstInvokeTime / FinalInvokeTime | 首次/末次调用时间 |
ServerHandleInfo(服务端每服务条目处理统计)
| 字段 | 说明 |
|---|---|
ServiceEntryId | 被处理的服务条目 ID |
Address | 调用方地址 |
TotalHandleCount | 总处理次数 |
FaultHandleCount | 处理失败次数 |
SeriousErrorCount | 严重错误次数 |
AET | 平均执行时间(毫秒,滑动平均) |
ServerInstanceHandleInfo(实例级服务端统计)
| 字段 | 说明 |
|---|---|
ConcurrentCount | 当前并发处理数 |
MaxConcurrentCount | 历史最大并发数 |
TotalHandleCount | 累计处理总次数 |
FaultHandleCount | 累计失败次数 |
TotalSeriousErrorCount | 严重错误累计次数 |
AllowMaxConcurrentCount | 允许的最大并发数(来自 GovernanceOptions.MaxConcurrentHandlingCount) |
监控启用控制
监控默认是可选功能,通过 RpcOptions.EnableMonitor 控制:
# appsettings.yml
Rpc:
EnableMonitor: true # 默认值,可设为 false 关闭监控以减少开销
当 EnableMonitor = false 时,DefaultServerMessageReceivedHandler 和 DefaultRemoteCaller 不会调用任何 Monitor 方法,端点健康追踪也不会发生。
注意:即使
EnableMonitor = false,IRpcEndpointMonitor仍然运行,因为它是更底层的注册中心-端点同步机制的一部分(服务发现时会调用Monitor()注册端点)。被禁用的只是基于调用结果的健康状态更新。
完整联动图
RPC 调用失败(网络异常、超时等)
│
▼
DefaultRemoteCaller.catch → InvokeMonitor.ExecFail()
│
├── 更新 InstanceInvokeInfo 统计
├── 更新 ClientInvokeInfo.FaultInvokeCount
│
└── RpcEndpointMonitor.ChangeStatus(endpoint, isEnable=false, ceiling)
│
├── CheckModel.UnHealthTimes++
├── 触发 OnDisEnable 事件 → EndpointSelector 清除缓存
│
└── UnHealthTimes >= unHealthCeilingTimes?
YES → 触发 OnRemoveRpcEndpoint 事件
│
└── EndpointSelector 移除端点池中该端点
后续调用不再路由到此实例
unHealthCeilingTimes 对应 GovernanceOptions.UnHealthAddressTimesAllowedBeforeRemoving(默认值 3)。设置为 0 时,任何一次失败都会立即触发端点移除。
