Silky微服务框架在线文档Silky微服务框架在线文档
首页
文档
配置
源码解析
博文
github
gitee
首页
文档
配置
源码解析
博文
github
gitee
  • 启动时

    • 主机的构建
    • 服务引擎
    • 模块
    • 服务与服务条目的解析
    • Silky服务主机
    • 依赖注入约定
    • RPC 服务代理
  • 运行时

    • 终结点与路由
    • 执行器与调度
    • 本地执行器与服务端过滤器
    • 远程执行器与 RPC 调用
    • RPC 服务端消息处理
    • 服务治理
    • 缓存拦截器
    • 分布式事务(TCC)
    • HTTP 网关请求管道
    • 过滤器执行管道
    • Polly 弹性策略管道
    • 端点健康监控

概述

silky 框架通过端点健康监控机制在运行时持续追踪每个 RPC 端点(ISilkyEndpoint)的健康状态。一旦端点在通信中连续失败达到阈值,框架会自动将其从可用端点列表中移除,防止持续将请求路由到不健康的实例。

本文说明端点健康状态的运行时追踪机制:从 RPC 调用失败 → Monitor 记录 → 状态变更 → 端点移除 → 选择器感知的完整联动链路。


核心组件

组件类型职责
IRpcEndpointMonitor / DefaultRpcEndpointMonitorSingleton维护所有端点的健康状态(IsEnable + UnHealthTimes),提供状态变更事件
IInvokeMonitor / DefaultInvokeMonitorSingleton(监控启用时)追踪每次客户端调用的成功/失败,联动 IRpcEndpointMonitor
IServerHandleMonitor / DefaultServerHandleMonitorSingleton(监控启用时)追踪每次服务端处理的成功/失败
IRpcEndpointSelector(各实现)Singleton订阅端点变更事件,清理失效端点缓存

端点健康状态数据结构

DefaultRpcEndpointMonitor 内部维护一个并发字典,以端点为键,记录其健康状态:

private ConcurrentDictionary<ISilkyEndpoint, CheckModel> m_checkEndpoints = new();

private class CheckModel
{
    public bool IsEnable { get; set; }      // 端点当前是否可用
    public int UnHealthTimes { get; set; }  // 连续不健康计数
}

Monitor(ISilkyEndpoint) 首次注册端点时将其加入字典(IsEnable = true, UnHealthTimes = 0),并触发 OnAddMonitor 事件。


联动链路:调用失败 → 端点移除

第一步:RPC 调用失败

DefaultRemoteCaller.InvokeAsync() 在 catch 块中调用 InvokeMonitor.ExecFail():

catch (Exception ex)
{
    invokeMonitor?.ExecFail(
        (remoteInvokeMessage.ServiceEntryId, selectedRpcEndpoint),
        sp.Elapsed.TotalMilliseconds,
        clientInvokeInfo);
    throw;
}

第二步:InvokeMonitor 更新调用统计并变更端点状态

DefaultInvokeMonitor.ExecFail() 完成两件事:

  1. 更新实例级统计信息(InstanceInvokeInfo.FaultInvokeCount++,AET 计算)
  2. 调用 IRpcEndpointMonitor.ChangeStatus(endpoint, isEnable: false, unHealthCeilingTimes)
public void ExecFail((string, ISilkyEndpoint) item, ...)
{
    // 更新实例级统计
    lock (_monitorProvider.InstanceInvokeInfo)
    {
        _monitorProvider.InstanceInvokeInfo.ConcurrentCount--;
        _monitorProvider.InstanceInvokeInfo.FaultInvokeCount++;
        ...
    }

    // 更新服务条目级调用信息
    clientInvokeInfo.IsEnable = _rpcEndpointMonitor.IsEnable(item.Item2);
    clientInvokeInfo.FaultInvokeCount++;
    ...

    // 通知 RpcEndpointMonitor 端点不健康
    _rpcEndpointMonitor.ChangeStatus(item.Item2, isEnable: false, unHealthCeilingTimes: ...);
}

第三步:RpcEndpointMonitor 更新状态并触发事件

DefaultRpcEndpointMonitor.ChangeStatus() 核心逻辑:

public void ChangeStatus(ISilkyEndpoint silkyEndpoint, bool isEnable, int unHealthCeilingTimes = 0)
{
    if (m_checkEndpoints.TryGetValue(silkyEndpoint, out var healthCheckModel))
    {
        var newModel = new CheckModel(
            isEnable,
            isEnable ? 0 : healthCheckModel.UnHealthTimes + 1  // 失败则递增
        );
        m_checkEndpoints.TryUpdate(silkyEndpoint, newModel, healthCheckModel);
        healthCheckModel = newModel;
    }

    // 超过阈值:移除端点并触发 OnRemoveRpcEndpoint 事件
    if (!isEnable && healthCheckModel.UnHealthTimes >= unHealthCeilingTimes)
    {
        OnRemoveRpcEndpoint?.Invoke(silkyEndpoint);
        m_checkEndpoints.TryRemove(silkyEndpoint, out _);
    }

    // 状态变化:触发 OnStatusChange 事件
    if (healthCheckModel.IsEnable != isEnable)
        OnStatusChange?.Invoke(silkyEndpoint, isEnable);

    // 不可用:触发 OnDisEnable 事件
    if (!isEnable)
        OnDisEnable?.Invoke(silkyEndpoint);
}

第四步:端点选择器感知移除事件

各 IRpcEndpointSelector 实现在构造函数中订阅端点变更事件,主动清除包含该端点的缓存池:

// PollingRpcEndpointSelector 构造函数
_rpcEndpointMonitor.OnRemoveRpcEndpoint += async rpcEndpoint =>
{
    var removeKeys = addressesPools
        .Where(p => p.Value.Endpoints.Any(q => q.Equals(rpcEndpoint)))
        .Select(p => p.Key);
    foreach (var key in removeKeys)
        addressesPools.TryRemove(key, out _);
};

_rpcEndpointMonitor.OnDisEnable += async rpcEndpoint =>
{
    // 同样清除包含该端点的缓存,下次路由时不会选到此端点
    ...
};

端点被移除或标记不可用后,选择器的缓存被清除,后续调用重新从可用端点列表中选择,不会再路由到不健康的实例。


服务端处理监控

DefaultServerHandleMonitor(服务端)的机制与客户端对称,追踪当前实例的服务端处理情况:

public ServerHandleInfo? Monitor((string serviceEntryId, string clientAddress) item)
{
    lock (_monitorProvider.InstanceHandleInfo)
    {
        _monitorProvider.InstanceHandleInfo.ConcurrentCount++;
        // 更新最大并发计数、首次/末次处理时间等
    }
    var serverHandleInfo = _monitorProvider.GetServerHandleInfo(cacheKey);
    serverHandleInfo.TotalHandleCount++;
    return serverHandleInfo;
}

ExecFail() 时记录:

  • InstanceHandleInfo.FaultHandleCount++
  • InstanceHandleInfo.TotalSeriousErrorCount++(严重错误时)
  • serverHandleInfo.FaultHandleCount++

监控数据模型

ClientInvokeInfo(客户端每服务条目调用统计)

字段说明
ServiceEntryId被调用的服务条目 ID
Address目标端点地址
TotalInvokeCount总调用次数
FaultInvokeCount失败调用次数
AET平均执行时间(毫秒,滑动平均)
IsEnable端点当前是否可用
FirstInvokeTime / FinalInvokeTime首次/末次调用时间

ServerHandleInfo(服务端每服务条目处理统计)

字段说明
ServiceEntryId被处理的服务条目 ID
Address调用方地址
TotalHandleCount总处理次数
FaultHandleCount处理失败次数
SeriousErrorCount严重错误次数
AET平均执行时间(毫秒,滑动平均)

ServerInstanceHandleInfo(实例级服务端统计)

字段说明
ConcurrentCount当前并发处理数
MaxConcurrentCount历史最大并发数
TotalHandleCount累计处理总次数
FaultHandleCount累计失败次数
TotalSeriousErrorCount严重错误累计次数
AllowMaxConcurrentCount允许的最大并发数(来自 GovernanceOptions.MaxConcurrentHandlingCount)

监控启用控制

监控默认是可选功能,通过 RpcOptions.EnableMonitor 控制:

# appsettings.yml
Rpc:
  EnableMonitor: true   # 默认值,可设为 false 关闭监控以减少开销

当 EnableMonitor = false 时,DefaultServerMessageReceivedHandler 和 DefaultRemoteCaller 不会调用任何 Monitor 方法,端点健康追踪也不会发生。

注意:即使 EnableMonitor = false,IRpcEndpointMonitor 仍然运行,因为它是更底层的注册中心-端点同步机制的一部分(服务发现时会调用 Monitor() 注册端点)。被禁用的只是基于调用结果的健康状态更新。


完整联动图

RPC 调用失败(网络异常、超时等)
    │
    ▼
DefaultRemoteCaller.catch → InvokeMonitor.ExecFail()
    │
    ├── 更新 InstanceInvokeInfo 统计
    ├── 更新 ClientInvokeInfo.FaultInvokeCount
    │
    └── RpcEndpointMonitor.ChangeStatus(endpoint, isEnable=false, ceiling)
            │
            ├── CheckModel.UnHealthTimes++
            ├── 触发 OnDisEnable 事件 → EndpointSelector 清除缓存
            │
            └── UnHealthTimes >= unHealthCeilingTimes?
                    YES → 触发 OnRemoveRpcEndpoint 事件
                            │
                            └── EndpointSelector 移除端点池中该端点
                                 后续调用不再路由到此实例

unHealthCeilingTimes 对应 GovernanceOptions.UnHealthAddressTimesAllowedBeforeRemoving(默认值 3)。设置为 0 时,任何一次失败都会立即触发端点移除。

编辑当前页
Prev
Polly 弹性策略管道