概述
silky 框架使用 Polly 为每个服务条目(ServiceEntry)动态构建弹性策略链,在客户端(调用方)和服务端(提供方)分别维护独立的策略管道。策略链按 serviceEntryId 缓存,避免重复创建。
本文说明策略管道的构建逻辑、各策略的触发条件,以及降级(Fallback)的执行路径。
客户端策略管道
DefaultInvokePolicyBuilder
DefaultInvokePolicyBuilder 负责组装客户端策略链,内部维护一个 ConcurrentDictionary<string, IAsyncPolicy<object?>> 缓存,以 serviceEntryId 为键:
public IAsyncPolicy<object?> Build(string serviceEntryId)
{
return _policyCaches.GetOrAdd(serviceEntryId, id =>
{
IAsyncPolicy<object?> policy = Policy.NoOpAsync<object?>();
// 1. PolicyWithResultProvider(带返回值策略,如溢出重试)
foreach (var provider in _policyWithResultProviders)
policy = policy.WrapAsync(provider.Create(id));
// 2. PolicyProvider(通用策略,如超时)
foreach (var provider in _policyProviders)
policy = policy.WrapAsync(provider.Create(id));
// 3. CircuitBreakerPolicyProvider(熔断器)
foreach (var provider in _circuitBreakerPolicyProviders)
policy = policy.WrapAsync(provider.Create(id));
return policy;
});
}
// 带 Fallback 的版本(每次调用重新组装,因为 Fallback 依赖 parameters)
public IAsyncPolicy<object?> Build(string serviceEntryId, object[] parameters)
{
var policy = Build(serviceEntryId); // 取缓存的基础策略
foreach (var provider in _invokeFallbackPolicyProviders)
policy = policy.WrapAsync(provider.Create(serviceEntryId, parameters));
return policy;
}
注意:Fallback 策略依赖
parameters(需要传入原始参数供降级方法使用),因此不缓存,每次调用重新组装并包裹在基础策略外层。
客户端各策略说明
超时策略(DefaultTimeoutInvokePolicyProvider)
从 GovernanceOptions.TimeoutMillSeconds 读取超时值,使用 Polly Optimistic 超时:
return Policy.TimeoutAsync(
TimeSpan.FromMilliseconds(serviceEntryDescriptor.GovernanceOptions.TimeoutMillSeconds),
TimeoutStrategy.Optimistic);
Optimistic 模式:依赖 CancellationToken,不强制终止 Task,而是在超时时取消令牌,符合 .NET 异步编程规范。当 TimeoutMillSeconds <= 0 时不创建超时策略。
溢出重试策略(OverflowServerHandleFailoverPolicyProvider)
当服务端返回 OverflowMaxServerHandleException(并发超过 MaxConcurrentHandlingCount)时自动重试:
Policy<object>
.Handle<OverflowMaxServerHandleException>()
.WaitAndRetryAsync(
retryCount: serviceEntryDescriptor.GovernanceOptions.RetryTimes,
sleepDurationProvider: _ =>
TimeSpan.FromMilliseconds(serviceEntryDescriptor.GovernanceOptions.RetryIntervalMillSeconds),
onRetry: (outcome, timeSpan, retryNumber, context) => OnRetry(...)
);
RetryTimes = 0 时不创建重试策略。
熔断器策略(DefaultInvokeCircuitBreakerPolicyProvider)
基于 连续异常次数 触发熔断,排除框架内部异常(友好异常、NotFound、服务端业务异常):
Policy
.Handle<Exception>(ex =>
!ex.IsFriendlyException() &&
!ex.IsNotFindServiceError() &&
!ex.IsFrameworkException() &&
!ex.IsNotImplemented() &&
!ex.IsServerException()
)
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: governanceOptions.ExceptionsAllowedBeforeBreaking,
durationOfBreak: TimeSpan.FromSeconds(governanceOptions.BreakerSeconds),
onBreak: (ex, timespan) => { /* 触发 OnBreak 事件 */ },
onReset: () => { /* 触发 OnReset 事件 */ }
);
EnableCircuitBreaker = false 时不创建熔断策略。
客户端降级策略(DefaultInvokeFallbackPolicyProvider)
当 ServiceEntry 标注了 [Fallback(typeof(FallbackImpl))] 且降级实现类已注册到 DI 时生效:
Policy<object>
.Handle<Exception>(ex => !(ex is INotNeedFallback))
.FallbackAsync(
async (ctx, t) => await _fallbackInvoker.Invoke(serviceEntry, parameters),
async (dr, context) => { /* 触发 OnInvokeFallback 事件 */ }
);
降级调用路径:DefaultFallbackInvoker.Invoke() → 从 DI 容器解析降级实现类 → 调用 FallbackMethodExecutor.ExecuteMethodWithAuditingAsync()。
服务端策略管道
DefaultHandlePolicyBuilder
DefaultHandlePolicyBuilder 为每个 RemoteInvokeMessage 构建服务端策略链,基础策略按 serviceEntryId 缓存,Fallback 策略每次重新组装:
public IAsyncPolicy<RemoteResultMessage> Build(RemoteInvokeMessage message)
{
// 1. 取缓存基础策略(HandlePolicyProvider + 熔断器)
var normPolicy = BuildNormPolicy(message.ServiceEntryId);
// 2. 组装 FallbackPolicy(不缓存,依赖 message)
foreach (var provider in _serverHandleFallbackPolicyProviders)
normPolicy = normPolicy.WrapAsync(provider.Create(message));
return normPolicy;
}
服务端各策略说明
服务端 Fallback(DefaultServerHandleFallbackPolicyProvider)
捕获所有非 INotNeedFallback 异常,委托给 DefaultServerFallbackHandler:
Policy<RemoteResultMessage>
.Handle<Exception>(ex => !(ex is INotNeedFallback))
.FallbackAsync(
async (ctx, t) => await _serverFallbackHandler.Handle(message, ctx, t),
async (dr, context) => { /* 触发 OnHandleFallback 事件 */ }
);
DefaultServerFallbackHandler 的处理逻辑:
- 读取
PollyContext中的原始异常 - 上报诊断追踪(
TracingError) - 若异常为
RpcAuthenticationException或NotFindLocalServiceEntryException,直接将异常信息包装到RemoteResultMessage返回(不走降级方法) - 若
ServiceEntry有[Fallback]标注且实现类存在,调用降级方法并返回其结果 - 否则返回原始异常状态码和消息
完整客户端调用链
DefaultRemoteExecutor.Execute()
│
├── 构建 RemoteInvokeMessage(序列化参数)
├── 解析 hashKey(HashAlgorithm 策略时)
│
▼
IInvokePolicyBuilder.Build(serviceEntryId, parameters)
│ 组装策略链:Fallback → CircuitBreaker → Timeout → OverflowRetry
│
▼
policy.ExecuteAsync(() =>
IRemoteCaller.InvokeAsync(message, shuntStrategy, hashKey))
│
├── 正常:返回 object 结果
├── 溢出异常:OverflowRetry 策略重试
├── 连续异常达阈值:CircuitBreaker 开启,后续请求直接抛 BrokenCircuitException
├── 超时:Timeout 策略抛 TimeoutRejectedException
└── 有 Fallback 配置:调用降级方法返回兜底结果
策略执行顺序总结
Polly 的 WrapAsync 是洋葱模型(外层先执行),最终的执行顺序为:
Fallback(最外层,捕获所有异常的最后兜底)
└── CircuitBreaker(熔断,开路时直接报错)
└── Timeout(超时,取消内层 CancellationToken)
└── OverflowRetry(溢出重试,内层)
└── 实际 RPC 调用
策略链按
serviceEntryId缓存后复用,因此各服务条目的治理参数(超时时间、重试次数等)相互独立,运行时修改GovernanceOptions需清除缓存才能生效。
配置与策略关系速查
GovernanceOptions 配置项 | 对应策略 | 触发条件 |
|---|---|---|
TimeoutMillSeconds > 0 | 超时策略 | 执行时间超过阈值时取消 CancellationToken |
RetryTimes > 0 | 溢出重试策略 | 服务端抛出 OverflowMaxServerHandleException |
EnableCircuitBreaker = true | 熔断器策略 | 连续 ExceptionsAllowedBeforeBreaking 次非豁免异常 |
BreakerSeconds | 熔断器持续时间 | 熔断后保持 Open 状态的秒数 |
[Fallback(typeof(T))] 特性 | 降级策略 | 任何非 INotNeedFallback 异常 |
