Appearance
CircuitBreaker
概述
CircuitBreaker 过滤器,用于为 Gateway 路由包装熔断器功能。该组件解决了在微服务架构中,当下游服务出现故障或响应缓慢时,能够快速失败并提供降级处理的问题,避免故障传播导致整个系统崩溃。
Spring Cloud CircuitBreaker 支持多种熔断器实现库,其中 Resilience4J 是开箱即用的默认实现。
核心功能
解决的业务问题
在微服务架构中,服务之间相互依赖,当某个服务出现故障时,可能会引发雪崩效应:
使用熔断器后的处理流程:
配置依赖
要启用 Spring Cloud CircuitBreaker 过滤器,需要在项目中添加以下依赖:
kotlin
dependencies {
// Spring Cloud CircuitBreaker with Resilience4J
implementation("org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j")
}xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>基础配置
简单配置
最基本的熔断器配置只需要指定熔断器名称:
yaml
spring:
cloud:
gateway:
routes:
- id: circuitbreaker_route
uri: https://example.org
filters:
- CircuitBreaker=myCircuitBreaker对应的 Kotlin 代码配置:
kotlin
@Configuration
class GatewayConfig {
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
return builder.routes()
.route("circuitbreaker_route") { r ->
r.path("/api/**")
.filters { f ->
f.circuitBreaker { c ->
c.name("myCircuitBreaker")
}
}
.uri("https://example.org")
}
.build()
}
}熔断器的具体配置(如失败率阈值、等待时间等)需要参考底层实现库的文档,如 [Resilience4J 文档](https://resilience4j.readme.io/docs)。
降级处理配置
基本降级配置
当熔断器触发时,可以配置降级 URI 来处理请求:
yaml
spring:
cloud:
gateway:
routes:
- id: circuitbreaker_route
uri: lb://backing-service:8088
predicates:
- Path=/consumingServiceEndpoint
filters:
- name: CircuitBreaker
args:
name: myCircuitBreaker
fallbackUri: forward:/inCaseOfFailureUseThis
- RewritePath=/consumingServiceEndpoint, /backingServiceEndpoint对应的 Kotlin 配置:
kotlin
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
return builder.routes()
.route("circuitbreaker_route") { r ->
r.path("/consumingServiceEndpoint")
.filters { f ->
f.circuitBreaker { c ->
c.name("myCircuitBreaker")
.fallbackUri("forward:/inCaseOfFailureUseThis")
}
.rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")
}
.uri("lb://backing-service:8088")
}
.build()
}降级处理器实现
创建一个降级处理控制器:
kotlin
@RestController
class FallbackController {
/**
* 处理服务熔断时的降级逻辑
*/
@RequestMapping("/inCaseOfFailureUseThis")
fun fallback(exchange: ServerWebExchange): Mono<ResponseEntity<Map<String, Any>>> {
// 获取熔断异常信息
val exception = exchange.getAttribute<Throwable>(
ServerWebExchangeUtils.CIRCUITBREAKER_EXECUTION_EXCEPTION_ATTR
)
val response = mapOf(
"status" to "fallback",
"message" to "服务暂时不可用,请稍后重试",
"timestamp" to Instant.now().toString(),
"error" to (exception?.message ?: "未知错误")
)
return Mono.just(ResponseEntity.ok(response))
}
}高级配置特性
URI 变量支持
熔断器支持在 fallbackUri 中使用 URI 变量,实现更复杂的路由选项:
yaml
spring:
cloud:
gateway:
routes:
- id: circuitbreaker_route
uri: lb://backing-service:8088
predicates:
- Path=/consumingServiceEndpoint/{*segments}
filters:
- name: CircuitBreaker
args:
name: myCircuitBreaker
fallbackUri: forward:/inCaseOfFailureUseThis/{segments}这样配置后,请求 consumingServiceEndpoint/users/1 在熔断时会被重定向到 inCaseOfFailureUseThis/users/1。
外部降级服务配置
可以将降级请求路由到外部应用:
yaml
spring:
cloud:
gateway:
routes:
# 主要服务路由
- id: ingredients
uri: lb://ingredients
predicates:
- Path=/ingredients/**
filters:
- name: CircuitBreaker
args:
name: fetchIngredients
fallbackUri: forward:/fallback
# 降级服务路由
- id: ingredients-fallback
uri: http://localhost:9994 # 外部降级服务
predicates:
- Path=/fallback对应的 Kotlin 配置:
kotlin
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
return builder.routes()
// 主要服务路由
.route("ingredients") { r ->
r.path("/ingredients/**")
.filters { f ->
f.circuitBreaker { c ->
c.name("fetchIngredients")
.fallbackUri("forward:/fallback")
}
}
.uri("lb://ingredients")
}
// 降级服务路由
.route("ingredients-fallback") { r ->
r.path("/fallback")
.uri("http://localhost:9994")
}
.build()
}基于状态码的熔断
配置触发熔断的状态码
可以基于特定的 HTTP 状态码来触发熔断器:
yaml
spring:
cloud:
gateway:
routes:
- id: circuitbreaker_route
uri: lb://backing-service:8088
predicates:
- Path=/consumingServiceEndpoint
filters:
- name: CircuitBreaker
args:
name: myCircuitBreaker
fallbackUri: forward:/inCaseOfFailureUseThis
statusCodes:
- 500 # HTTP 状态码数字
- "NOT_FOUND" # HttpStatus 枚举字符串Kotlin 代码配置:
kotlin
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
return builder.routes()
.route("circuitbreaker_route") { r ->
r.path("/consumingServiceEndpoint")
.filters { f ->
f.circuitBreaker { c ->
c.name("myCircuitBreaker")
.fallbackUri("forward:/inCaseOfFailureUseThis")
.addStatusCode("INTERNAL_SERVER_ERROR")
.addStatusCode("NOT_FOUND")
}
.rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")
}
.uri("lb://backing-service:8088")
}
.build()
}状态码处理示例
kotlin
@RestController
class ApiController {
/**
* 业务服务接口
*/
@GetMapping("/consumingServiceEndpoint")
fun businessService(): Mono<ResponseEntity<String>> {
return webClient.get()
.uri("/backingServiceEndpoint")
.retrieve()
.bodyToMono(String::class.java)
.map { ResponseEntity.ok(it) }
.onErrorReturn {
// 当后端服务返回 500 或 404 时,会触发熔断器
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("后端服务异常")
}
}
}Resilience4J 集成配置
熔断器参数配置
yaml
resilience4j:
circuitbreaker:
configs:
default:
# 熔断器配置
sliding-window-size: 10 # 滑动窗口大小
minimum-number-of-calls: 5 # 最小调用次数
failure-rate-threshold: 50 # 失败率阈值(百分比)
wait-duration-in-open-state: 30s # 熔断器开启状态等待时间
permitted-number-of-calls-in-half-open-state: 3 # 半开状态允许的调用次数
instances:
myCircuitBreaker:
base-config: default
# 可以为特定实例覆盖配置
failure-rate-threshold: 60对应的 Kotlin 配置类:
kotlin
@Configuration
class CircuitBreakerConfig {
/**
* 自定义熔断器配置
*/
@Bean
@Primary
fun circuitBreakerCustomizer(): Customizer<ReactiveResilience4JCircuitBreakerFactory> {
return Customizer { factory ->
factory.configureDefault { id ->
CircuitBreakerConfig.custom()
.slidingWindowSize(10)
.minimumNumberOfCalls(5)
.failureRateThreshold(50.0f)
.waitDurationInOpenState(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(3)
.build()
}
}
}
}实际业务场景示例
电商订单处理场景
假设有一个电商系统,订单服务需要调用库存服务、支付服务和物流服务:
kotlin
@Configuration
class OrderGatewayConfig {
@Bean
fun orderRoutes(builder: RouteLocatorBuilder): RouteLocator {
return builder.routes()
// 库存服务路由 - 高可用要求
.route("inventory_service") { r ->
r.path("/api/inventory/**")
.filters { f ->
f.circuitBreaker { c ->
c.name("inventoryCircuitBreaker")
.fallbackUri("forward:/fallback/inventory")
}
}
.uri("lb://inventory-service")
}
// 支付服务路由 - 关键服务
.route("payment_service") { r ->
r.path("/api/payment/**")
.filters { f ->
f.circuitBreaker { c ->
c.name("paymentCircuitBreaker")
.fallbackUri("forward:/fallback/payment")
}
}
.uri("lb://payment-service")
}
// 物流服务路由 - 可降级
.route("logistics_service") { r ->
r.path("/api/logistics/**")
.filters { f ->
f.circuitBreaker { c ->
c.name("logisticsCircuitBreaker")
.fallbackUri("forward:/fallback/logistics")
}
}
.uri("lb://logistics-service")
}
.build()
}
}降级策略控制器
kotlin
@RestController
@RequestMapping("/fallback")
class OrderFallbackController {
private val logger = LoggerFactory.getLogger(OrderFallbackController::class.java)
/**
* 库存服务降级 - 返回缓存数据或默认库存
*/
@RequestMapping("/inventory/**")
fun inventoryFallback(request: ServerHttpRequest): Mono<ResponseEntity<Map<String, Any>>> {
logger.warn("库存服务熔断,使用降级策略")
val fallbackResponse = mapOf(
"status" to "degraded",
"message" to "库存服务暂时不可用,显示默认库存信息",
"inventory" to 0, // 默认无库存,避免超卖
"available" to false
)
return Mono.just(ResponseEntity.ok(fallbackResponse))
}
/**
* 支付服务降级 - 记录支付请求,稍后处理
*/
@RequestMapping("/payment/**")
fun paymentFallback(request: ServerHttpRequest, exchange: ServerWebExchange): Mono<ResponseEntity<Map<String, Any>>> {
logger.error("支付服务熔断,这是关键服务异常")
// 获取异常信息
val exception = exchange.getAttribute<Throwable>(
ServerWebExchangeUtils.CIRCUITBREAKER_EXECUTION_EXCEPTION_ATTR
)
val fallbackResponse = mapOf(
"status" to "error",
"message" to "支付服务暂时不可用,请稍后重试",
"retryAfter" to 300, // 5分钟后重试
"error" to (exception?.message ?: "支付服务异常")
)
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(fallbackResponse))
}
/**
* 物流服务降级 - 使用默认物流方案
*/
@RequestMapping("/logistics/**")
fun logisticsFallback(): Mono<ResponseEntity<Map<String, Any>>> {
logger.info("物流服务熔断,使用默认物流方案")
val fallbackResponse = mapOf(
"status" to "degraded",
"message" to "使用默认物流服务",
"estimatedDelivery" to "7-15个工作日",
"method" to "标准快递"
)
return Mono.just(ResponseEntity.ok(fallbackResponse))
}
}熔断器监控配置
kotlin
@Component
class CircuitBreakerEventListener {
private val logger = LoggerFactory.getLogger(CircuitBreakerEventListener::class.java)
@EventListener
fun onCircuitBreakerEvent(event: CircuitBreakerEvent) {
when (event.eventType) {
CircuitBreakerEvent.Type.STATE_TRANSITION -> {
logger.warn("熔断器 [{}] 状态变更: {} -> {}",
event.circuitBreakerName,
event.stateTransition?.fromState,
event.stateTransition?.toState
)
}
CircuitBreakerEvent.Type.ERROR -> {
logger.error("熔断器 [{}] 发生错误: {}",
event.circuitBreakerName,
event.throwable?.message
)
}
CircuitBreakerEvent.Type.SUCCESS -> {
logger.debug("熔断器 [{}] 调用成功", event.circuitBreakerName)
}
}
}
}最佳实践
1. 熔断器命名规范
kotlin
// 建议的命名规范
class CircuitBreakerNames {
companion object {
const val USER_SERVICE = "userServiceCircuitBreaker"
const val ORDER_SERVICE = "orderServiceCircuitBreaker"
const val PAYMENT_SERVICE = "paymentServiceCircuitBreaker"
const val INVENTORY_SERVICE = "inventoryServiceCircuitBreaker"
}
}2. 分层降级策略
3. 监控和告警
kotlin
@Component
class CircuitBreakerMetrics {
private val meterRegistry: MeterRegistry = Metrics.globalRegistry
@EventListener
fun onStateTransition(event: CircuitBreakerOnStateTransitionEvent) {
// 记录熔断器状态变更指标
meterRegistry.counter("circuitbreaker.state.transition",
"name", event.circuitBreakerName,
"from", event.stateTransition.fromState.name,
"to", event.stateTransition.toState.name
).increment()
// 发送告警通知
if (event.stateTransition.toState == CircuitBreaker.State.OPEN) {
sendAlert("熔断器 ${event.circuitBreakerName} 已开启")
}
}
private fun sendAlert(message: String) {
// 实现告警逻辑(如发送邮件、短信、钉钉等)
logger.error("熔断器告警: $message")
}
}总结
CircuitBreaker GatewayFilter Factory 是 Spring Cloud Gateway 中的重要组件,它提供了:
- 故障隔离:防止单点故障影响整个系统
- 快速失败:避免长时间等待不可用的服务
- 优雅降级:在服务不可用时提供备选方案
- 自动恢复:当服务恢复时自动恢复正常调用
在生产环境中使用熔断器时,需要根据业务特性合理配置熔断参数,并建立完善的监控和告警机制。
通过合理配置和使用熔断器,可以显著提高微服务系统的稳定性和可用性,为用户提供更好的服务体验。