Skip to content

CacheRequestBody 过滤器工厂

概述

在某些业务场景中,我们需要读取请求体的内容。由于 HTTP 请求体只能被读取一次,这就需要我们将请求体缓存起来。CacheRequestBody 过滤器正是为了解决这个问题而设计的,它可以在将请求发送到下游服务之前缓存请求体,并通过 exchange 属性获取缓存的内容。

此过滤器仅适用于 HTTP(包括 HTTPS)请求。

业务场景

常见使用场景

  1. API 日志记录 - 需要记录完整的请求体内容用于审计和调试
  2. 请求验证 - 在转发请求前需要验证请求体的格式和内容
  3. 数据转换 - 需要读取请求体进行数据格式转换或增强
  4. 安全检查 - 对请求体进行安全扫描和内容过滤

解决的核心问题

在微服务架构中,网关经常需要对请求进行预处理,但由于 HTTP 请求体的特性(只能读取一次),如果在网关层读取了请求体,下游服务就无法再次读取。CacheRequestBody 过滤器通过缓存机制解决了这个技术难题。

工作原理

配置方式

1. 编程式配置(Kotlin)

kotlin
import org.springframework.cloud.gateway.route.RouteLocatorBuilder
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.net.URI

@Configuration
class GatewayConfiguration {

    @Bean
    fun routes(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("cache_request_body_route") { r ->
                r.path("/downstream/**")
                    .filters { f ->
                        f.prefixPath("/httpbin")
                            .cacheRequestBody(String::class.java) 
                    }
                    .uri("lb://downstream-service")
            }
            .build()
    }
}
java
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("cache_request_body_route", r -> r.path("/downstream/**")
            .filters(f -> f.prefixPath("/httpbin")
        		.cacheRequestBody(String.class).uri(uri))
        .build();
}

2. 配置文件方式

yaml
spring:
  cloud:
    gateway:
      routes:
        - id: cache_request_body_route
          uri: lb://downstream-service
          predicates:
            - Path=/downstream/**
          filters:
            - name: CacheRequestBody
              args:
                bodyClass: java.lang.String # 指定缓存的请求体类型

参数说明

参数名类型必填说明
bodyClassClass<?>指定请求体要转换成的目标类型,如 java.lang.Stringbyte[]

获取缓存的请求体

在下游的过滤器或处理器中,可以通过以下方式获取缓存的请求体:

kotlin
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils
import org.springframework.web.server.ServerWebExchange

class CustomGatewayFilter : GatewayFilter {

    override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
        // 获取缓存的请求体
        val cachedBody = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as? String

        if (cachedBody != null) {
            // 处理缓存的请求体
            println("缓存的请求体内容: $cachedBody")

            // 可以进行各种处理,如日志记录、验证等
            if (isValidRequest(cachedBody)) {
                return chain.filter(exchange)
            } else {
                // 返回错误响应
                return createErrorResponse(exchange)
            }
        }

        return chain.filter(exchange)
    }

    private fun isValidRequest(body: String): Boolean {
        // 实现请求验证逻辑
        return body.isNotEmpty() && body.contains("required_field")
    }

    private fun createErrorResponse(exchange: ServerWebExchange): Mono<Void> {
        val response = exchange.response
        response.statusCode = HttpStatus.BAD_REQUEST
        return response.setComplete()
    }
}

实际业务应用示例

场景:API 请求日志记录

kotlin
import org.springframework.cloud.gateway.filter.GatewayFilterChain
import org.springframework.cloud.gateway.filter.GlobalFilter
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils
import org.springframework.core.Ordered
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import reactor.core.publisher.Mono
import java.time.LocalDateTime

@Component
class RequestLoggingFilter : GlobalFilter, Ordered {

    override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
        // 获取缓存的请求体
        val cachedBody = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as? String

        // 记录请求信息
        val request = exchange.request
        val logEntry = RequestLogEntry(
            timestamp = LocalDateTime.now(),
            method = request.method.toString(),
            path = request.path.value(),
            headers = request.headers.toSingleValueMap(),
            body = cachedBody ?: "无请求体"
        )

        // 异步记录日志,不影响请求处理
        logRequestAsync(logEntry)

        return chain.filter(exchange)
    }

    override fun getOrder(): Int = Ordered.LOWEST_PRECEDENCE

    private fun logRequestAsync(logEntry: RequestLogEntry) {
        // 实现异步日志记录逻辑
        // 可以发送到消息队列、存储到数据库等
        println("API请求日志: $logEntry")
    }
}

data class RequestLogEntry(
    val timestamp: LocalDateTime,
    val method: String,
    val path: String,
    val headers: Map<String, String>,
    val body: String
)

场景:请求内容验证

kotlin
@Component
class RequestValidationFilter : GlobalFilter, Ordered {

    override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
        val cachedBody = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as? String

        return if (cachedBody != null && needsValidation(exchange.request.path.value())) {
            validateRequestBody(cachedBody)
                .flatMap { isValid ->
                    if (isValid) {
                        chain.filter(exchange)
                    } else {
                        createValidationErrorResponse(exchange)
                    }
                }
        } else {
            chain.filter(exchange)
        }
    }

    private fun needsValidation(path: String): Boolean {
        // 确定哪些路径需要进行请求体验证
        return path.startsWith("/api/secure/") || path.startsWith("/api/payment/")
    }

    private fun validateRequestBody(body: String): Mono<Boolean> {
        return Mono.fromCallable {
            try {
                // 使用 JSON 库解析请求体
                val jsonNode = ObjectMapper().readTree(body)

                // 执行业务验证逻辑
                jsonNode.has("userId") &&
                jsonNode.has("timestamp") &&
                jsonNode.get("userId").asText().isNotEmpty()
            } catch (e: Exception) {
                false
            }
        }
    }

    private fun createValidationErrorResponse(exchange: ServerWebExchange): Mono<Void> {
        val response = exchange.response
        response.statusCode = HttpStatus.BAD_REQUEST

        val errorMessage = """
            {
                "error": "REQUEST_VALIDATION_FAILED",
                "message": "请求体格式不正确或缺少必要字段"
            }
        """.trimIndent()

        val buffer = response.bufferFactory().wrap(errorMessage.toByteArray())
        return response.writeWith(Mono.just(buffer))
    }

    override fun getOrder(): Int = -1
}

性能考虑

使用 `CacheRequestBody` 过滤器会增加内存使用量,特别是在处理大量请求或大体积请求体时需要特别注意。

最佳实践

  1. 合理选择缓存类型

    kotlin
    // 对于小型 JSON 请求,使用 String
    .cacheRequestBody(String::class.java)
    
    // 对于二进制数据或大型请求,使用 byte[]
    .cacheRequestBody(ByteArray::class.java)
  2. 条件性应用过滤器

    kotlin
    .route("selective_cache_route") { r ->
        r.path("/api/upload/**")
            .and()
            .header("Content-Type", "application/json")
            .filters { f ->
                f.cacheRequestBody(String::class.java)
            }
            .uri("lb://file-service")
    }
  3. 设置请求体大小限制

    yaml
    spring:
      cloud:
        gateway:
          httpclient:
            max-initial-line-length: 4096
            max-header-size: 8192
          # 限制请求体大小
          filter:
            request-size:
              max-in-memory-size: 1MB

错误处理

kotlin
@Component
class CacheRequestBodyErrorHandler : GlobalFilter, Ordered {

    override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
        return chain.filter(exchange)
            .doOnError { throwable ->
                if (throwable is DataBufferLimitException) {
                    handleRequestBodyTooLarge(exchange, throwable)
                }
            }
            .onErrorResume { throwable ->
                when (throwable) {
                    is DataBufferLimitException -> createErrorResponse(
                        exchange,
                        HttpStatus.PAYLOAD_TOO_LARGE,
                        "请求体过大"
                    )
                    else -> createErrorResponse(
                        exchange,
                        HttpStatus.INTERNAL_SERVER_ERROR,
                        "处理请求时发生错误"
                    )
                }
            }
    }

    private fun handleRequestBodyTooLarge(exchange: ServerWebExchange, exception: DataBufferLimitException) {
        // 记录错误日志
        val request = exchange.request
        println("请求体过大: ${request.path}, 来源: ${request.remoteAddress}")
    }

    private fun createErrorResponse(
        exchange: ServerWebExchange,
        status: HttpStatus,
        message: String
    ): Mono<Void> {
        val response = exchange.response
        response.statusCode = status

        val errorBody = """
            {
                "timestamp": "${LocalDateTime.now()}",
                "status": ${status.value()},
                "error": "${status.reasonPhrase}",
                "message": "$message",
                "path": "${exchange.request.path.value()}"
            }
        """.trimIndent()

        val buffer = response.bufferFactory().wrap(errorBody.toByteArray())
        response.headers.add("Content-Type", "application/json")
        return response.writeWith(Mono.just(buffer))
    }

    override fun getOrder(): Int = Ordered.HIGHEST_PRECEDENCE
}

总结

TIP

CacheRequestBody 过滤器是 Spring Cloud Gateway 中处理请求体的重要工具,它解决了请求体只能读取一次的限制,为网关层的请求处理提供了更多可能性。

关键要点

  1. 适用场景明确 - 主要用于需要读取请求体的场景,如日志记录、验证、转换等
  2. 性能影响 - 会增加内存使用,需要合理控制使用范围
  3. 类型灵活 - 支持多种数据类型的缓存,根据实际需求选择
  4. 获取便捷 - 通过 ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR 键可以轻松获取缓存内容

通过合理使用 CacheRequestBody 过滤器,我们可以在不影响下游服务的前提下,在网关层实现丰富的请求处理逻辑。

Details

扩展阅读