Skip to content

CircuitBreaker

概述

CircuitBreaker 过滤器,用于为 Gateway 路由包装熔断器功能。该组件解决了在微服务架构中,当下游服务出现故障或响应缓慢时,能够快速失败并提供降级处理的问题,避免故障传播导致整个系统崩溃。

Spring Cloud CircuitBreaker 支持多种熔断器实现库,其中 Resilience4J 是开箱即用的默认实现。

核心功能

解决的业务问题

在微服务架构中,服务之间相互依赖,当某个服务出现故障时,可能会引发雪崩效应:

使用熔断器后的处理流程:

配置依赖

要启用 Spring Cloud CircuitBreaker 过滤器,需要在项目中添加以下依赖:

kotlin
dependencies {
    // Spring Cloud CircuitBreaker with Resilience4J
    implementation("org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j")
}
xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>

基础配置

简单配置

最基本的熔断器配置只需要指定熔断器名称:

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: circuitbreaker_route
          uri: https://example.org
          filters:
            - CircuitBreaker=myCircuitBreaker

对应的 Kotlin 代码配置:

kotlin
@Configuration
class GatewayConfig {

    @Bean
    fun routes(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("circuitbreaker_route") { r ->
                r.path("/api/**")
                    .filters { f ->
                        f.circuitBreaker { c ->
                            c.name("myCircuitBreaker")
                        }
                    }
                    .uri("https://example.org")
            }
            .build()
    }
}

熔断器的具体配置(如失败率阈值、等待时间等)需要参考底层实现库的文档,如 [Resilience4J 文档](https://resilience4j.readme.io/docs)。

降级处理配置

基本降级配置

当熔断器触发时,可以配置降级 URI 来处理请求:

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: circuitbreaker_route
          uri: lb://backing-service:8088
          predicates:
            - Path=/consumingServiceEndpoint
          filters:
            - name: CircuitBreaker
              args:
                name: myCircuitBreaker
                fallbackUri: forward:/inCaseOfFailureUseThis
            - RewritePath=/consumingServiceEndpoint, /backingServiceEndpoint

对应的 Kotlin 配置:

kotlin
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
    return builder.routes()
        .route("circuitbreaker_route") { r ->
            r.path("/consumingServiceEndpoint")
                .filters { f ->
                    f.circuitBreaker { c ->
                        c.name("myCircuitBreaker")
                            .fallbackUri("forward:/inCaseOfFailureUseThis")
                    }
                        .rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")
                }
                .uri("lb://backing-service:8088")
        }
        .build()
}

降级处理器实现

创建一个降级处理控制器:

kotlin
@RestController
class FallbackController {

    /**
     * 处理服务熔断时的降级逻辑
     */
    @RequestMapping("/inCaseOfFailureUseThis")
    fun fallback(exchange: ServerWebExchange): Mono<ResponseEntity<Map<String, Any>>> {
        // 获取熔断异常信息
        val exception = exchange.getAttribute<Throwable>(
            ServerWebExchangeUtils.CIRCUITBREAKER_EXECUTION_EXCEPTION_ATTR
        )

        val response = mapOf(
            "status" to "fallback",
            "message" to "服务暂时不可用,请稍后重试",
            "timestamp" to Instant.now().toString(),
            "error" to (exception?.message ?: "未知错误")
        )

        return Mono.just(ResponseEntity.ok(response))
    }
}

高级配置特性

URI 变量支持

熔断器支持在 fallbackUri 中使用 URI 变量,实现更复杂的路由选项:

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: circuitbreaker_route
          uri: lb://backing-service:8088
          predicates:
            - Path=/consumingServiceEndpoint/{*segments}
          filters:
            - name: CircuitBreaker
              args:
                name: myCircuitBreaker
                fallbackUri: forward:/inCaseOfFailureUseThis/{segments}

这样配置后,请求 consumingServiceEndpoint/users/1 在熔断时会被重定向到 inCaseOfFailureUseThis/users/1

外部降级服务配置

可以将降级请求路由到外部应用:

yaml
spring:
  cloud:
    gateway:
      routes:
        # 主要服务路由
        - id: ingredients
          uri: lb://ingredients
          predicates:
            - Path=/ingredients/**
          filters:
            - name: CircuitBreaker
              args:
                name: fetchIngredients
                fallbackUri: forward:/fallback

        # 降级服务路由
        - id: ingredients-fallback
          uri: http://localhost:9994 # 外部降级服务
          predicates:
            - Path=/fallback

对应的 Kotlin 配置:

kotlin
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
    return builder.routes()
        // 主要服务路由
        .route("ingredients") { r ->
            r.path("/ingredients/**")
                .filters { f ->
                    f.circuitBreaker { c ->
                        c.name("fetchIngredients")
                            .fallbackUri("forward:/fallback")
                    }
                }
                .uri("lb://ingredients")
        }
        // 降级服务路由
        .route("ingredients-fallback") { r ->
            r.path("/fallback")
                .uri("http://localhost:9994")
        }
        .build()
}

基于状态码的熔断

配置触发熔断的状态码

可以基于特定的 HTTP 状态码来触发熔断器:

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: circuitbreaker_route
          uri: lb://backing-service:8088
          predicates:
            - Path=/consumingServiceEndpoint
          filters:
            - name: CircuitBreaker
              args:
                name: myCircuitBreaker
                fallbackUri: forward:/inCaseOfFailureUseThis
                statusCodes:
                  - 500 # HTTP 状态码数字
                  - "NOT_FOUND" # HttpStatus 枚举字符串

Kotlin 代码配置:

kotlin
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
    return builder.routes()
        .route("circuitbreaker_route") { r ->
            r.path("/consumingServiceEndpoint")
                .filters { f ->
                    f.circuitBreaker { c ->
                        c.name("myCircuitBreaker")
                            .fallbackUri("forward:/inCaseOfFailureUseThis")
                            .addStatusCode("INTERNAL_SERVER_ERROR")
                            .addStatusCode("NOT_FOUND")
                    }
                        .rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")
                }
                .uri("lb://backing-service:8088")
        }
        .build()
    }

状态码处理示例

kotlin
@RestController
class ApiController {

    /**
     * 业务服务接口
     */
    @GetMapping("/consumingServiceEndpoint")
    fun businessService(): Mono<ResponseEntity<String>> {
        return webClient.get()
            .uri("/backingServiceEndpoint")
            .retrieve()
            .bodyToMono(String::class.java)
            .map { ResponseEntity.ok(it) }
            .onErrorReturn {
                // 当后端服务返回 500 或 404 时,会触发熔断器
                ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("后端服务异常")
            }
    }
}

Resilience4J 集成配置

熔断器参数配置

yaml
resilience4j:
  circuitbreaker:
    configs:
      default:
        # 熔断器配置
        sliding-window-size: 10 # 滑动窗口大小
        minimum-number-of-calls: 5 # 最小调用次数
        failure-rate-threshold: 50 # 失败率阈值(百分比)
        wait-duration-in-open-state: 30s # 熔断器开启状态等待时间
        permitted-number-of-calls-in-half-open-state: 3 # 半开状态允许的调用次数
    instances:
      myCircuitBreaker:
        base-config: default
        # 可以为特定实例覆盖配置
        failure-rate-threshold: 60

对应的 Kotlin 配置类:

kotlin
@Configuration
class CircuitBreakerConfig {

    /**
     * 自定义熔断器配置
     */
    @Bean
    @Primary
    fun circuitBreakerCustomizer(): Customizer<ReactiveResilience4JCircuitBreakerFactory> {
        return Customizer { factory ->
            factory.configureDefault { id ->
                CircuitBreakerConfig.custom()
                    .slidingWindowSize(10)
                    .minimumNumberOfCalls(5)
                    .failureRateThreshold(50.0f)
                    .waitDurationInOpenState(Duration.ofSeconds(30))
                    .permittedNumberOfCallsInHalfOpenState(3)
                    .build()
            }
        }
    }
}

实际业务场景示例

电商订单处理场景

假设有一个电商系统,订单服务需要调用库存服务、支付服务和物流服务:

kotlin
@Configuration
class OrderGatewayConfig {

    @Bean
    fun orderRoutes(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            // 库存服务路由 - 高可用要求
            .route("inventory_service") { r ->
                r.path("/api/inventory/**")
                    .filters { f ->
                        f.circuitBreaker { c ->
                            c.name("inventoryCircuitBreaker")
                                .fallbackUri("forward:/fallback/inventory")
                        }
                    }
                    .uri("lb://inventory-service")
            }
            // 支付服务路由 - 关键服务
            .route("payment_service") { r ->
                r.path("/api/payment/**")
                    .filters { f ->
                        f.circuitBreaker { c ->
                            c.name("paymentCircuitBreaker")
                                .fallbackUri("forward:/fallback/payment")
                        }
                    }
                    .uri("lb://payment-service")
            }
            // 物流服务路由 - 可降级
            .route("logistics_service") { r ->
                r.path("/api/logistics/**")
                    .filters { f ->
                        f.circuitBreaker { c ->
                            c.name("logisticsCircuitBreaker")
                                .fallbackUri("forward:/fallback/logistics")
                        }
                    }
                    .uri("lb://logistics-service")
            }
            .build()
    }
}

降级策略控制器

kotlin
@RestController
@RequestMapping("/fallback")
class OrderFallbackController {

    private val logger = LoggerFactory.getLogger(OrderFallbackController::class.java)

    /**
     * 库存服务降级 - 返回缓存数据或默认库存
     */
    @RequestMapping("/inventory/**")
    fun inventoryFallback(request: ServerHttpRequest): Mono<ResponseEntity<Map<String, Any>>> {
        logger.warn("库存服务熔断,使用降级策略")

        val fallbackResponse = mapOf(
            "status" to "degraded",
            "message" to "库存服务暂时不可用,显示默认库存信息",
            "inventory" to 0,  // 默认无库存,避免超卖
            "available" to false
        )

        return Mono.just(ResponseEntity.ok(fallbackResponse))
    }

    /**
     * 支付服务降级 - 记录支付请求,稍后处理
     */
    @RequestMapping("/payment/**")
    fun paymentFallback(request: ServerHttpRequest, exchange: ServerWebExchange): Mono<ResponseEntity<Map<String, Any>>> {
        logger.error("支付服务熔断,这是关键服务异常")

        // 获取异常信息
        val exception = exchange.getAttribute<Throwable>(
            ServerWebExchangeUtils.CIRCUITBREAKER_EXECUTION_EXCEPTION_ATTR
        )

        val fallbackResponse = mapOf(
            "status" to "error",
            "message" to "支付服务暂时不可用,请稍后重试",
            "retryAfter" to 300,  // 5分钟后重试
            "error" to (exception?.message ?: "支付服务异常")
        )

        return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(fallbackResponse))
    }

    /**
     * 物流服务降级 - 使用默认物流方案
     */
    @RequestMapping("/logistics/**")
    fun logisticsFallback(): Mono<ResponseEntity<Map<String, Any>>> {
        logger.info("物流服务熔断,使用默认物流方案")

        val fallbackResponse = mapOf(
            "status" to "degraded",
            "message" to "使用默认物流服务",
            "estimatedDelivery" to "7-15个工作日",
            "method" to "标准快递"
        )

        return Mono.just(ResponseEntity.ok(fallbackResponse))
    }
}

熔断器监控配置

kotlin
@Component
class CircuitBreakerEventListener {

    private val logger = LoggerFactory.getLogger(CircuitBreakerEventListener::class.java)

    @EventListener
    fun onCircuitBreakerEvent(event: CircuitBreakerEvent) {
        when (event.eventType) {
            CircuitBreakerEvent.Type.STATE_TRANSITION -> {
                logger.warn("熔断器 [{}] 状态变更: {} -> {}",
                    event.circuitBreakerName,
                    event.stateTransition?.fromState,
                    event.stateTransition?.toState
                )
            }
            CircuitBreakerEvent.Type.ERROR -> {
                logger.error("熔断器 [{}] 发生错误: {}",
                    event.circuitBreakerName,
                    event.throwable?.message
                )
            }
            CircuitBreakerEvent.Type.SUCCESS -> {
                logger.debug("熔断器 [{}] 调用成功", event.circuitBreakerName)
            }
        }
    }
}

最佳实践

1. 熔断器命名规范

kotlin
// 建议的命名规范
class CircuitBreakerNames {
    companion object {
        const val USER_SERVICE = "userServiceCircuitBreaker"
        const val ORDER_SERVICE = "orderServiceCircuitBreaker"
        const val PAYMENT_SERVICE = "paymentServiceCircuitBreaker"
        const val INVENTORY_SERVICE = "inventoryServiceCircuitBreaker"
    }
}

2. 分层降级策略

3. 监控和告警

kotlin
@Component
class CircuitBreakerMetrics {

    private val meterRegistry: MeterRegistry = Metrics.globalRegistry

    @EventListener
    fun onStateTransition(event: CircuitBreakerOnStateTransitionEvent) {
        // 记录熔断器状态变更指标
        meterRegistry.counter("circuitbreaker.state.transition",
            "name", event.circuitBreakerName,
            "from", event.stateTransition.fromState.name,
            "to", event.stateTransition.toState.name
        ).increment()

        // 发送告警通知
        if (event.stateTransition.toState == CircuitBreaker.State.OPEN) {
            sendAlert("熔断器 ${event.circuitBreakerName} 已开启")
        }
    }

    private fun sendAlert(message: String) {
        // 实现告警逻辑(如发送邮件、短信、钉钉等)
        logger.error("熔断器告警: $message")
    }
}

总结

CircuitBreaker GatewayFilter Factory 是 Spring Cloud Gateway 中的重要组件,它提供了:

  • 故障隔离:防止单点故障影响整个系统
  • 快速失败:避免长时间等待不可用的服务
  • 优雅降级:在服务不可用时提供备选方案
  • 自动恢复:当服务恢复时自动恢复正常调用

在生产环境中使用熔断器时,需要根据业务特性合理配置熔断参数,并建立完善的监控和告警机制。

通过合理配置和使用熔断器,可以显著提高微服务系统的稳定性和可用性,为用户提供更好的服务体验。