Appearance
CacheRequestBody 过滤器工厂
概述
在某些业务场景中,我们需要读取请求体的内容。由于 HTTP 请求体只能被读取一次,这就需要我们将请求体缓存起来。CacheRequestBody 过滤器正是为了解决这个问题而设计的,它可以在将请求发送到下游服务之前缓存请求体,并通过 exchange 属性获取缓存的内容。
此过滤器仅适用于 HTTP(包括 HTTPS)请求。
业务场景
常见使用场景
- API 日志记录 - 需要记录完整的请求体内容用于审计和调试
- 请求验证 - 在转发请求前需要验证请求体的格式和内容
- 数据转换 - 需要读取请求体进行数据格式转换或增强
- 安全检查 - 对请求体进行安全扫描和内容过滤
解决的核心问题
在微服务架构中,网关经常需要对请求进行预处理,但由于 HTTP 请求体的特性(只能读取一次),如果在网关层读取了请求体,下游服务就无法再次读取。CacheRequestBody 过滤器通过缓存机制解决了这个技术难题。
工作原理
配置方式
1. 编程式配置(Kotlin)
kotlin
import org.springframework.cloud.gateway.route.RouteLocatorBuilder
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.net.URI
@Configuration
class GatewayConfiguration {
@Bean
fun routes(builder: RouteLocatorBuilder): RouteLocator {
return builder.routes()
.route("cache_request_body_route") { r ->
r.path("/downstream/**")
.filters { f ->
f.prefixPath("/httpbin")
.cacheRequestBody(String::class.java)
}
.uri("lb://downstream-service")
}
.build()
}
}java
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("cache_request_body_route", r -> r.path("/downstream/**")
.filters(f -> f.prefixPath("/httpbin")
.cacheRequestBody(String.class).uri(uri))
.build();
}2. 配置文件方式
yaml
spring:
cloud:
gateway:
routes:
- id: cache_request_body_route
uri: lb://downstream-service
predicates:
- Path=/downstream/**
filters:
- name: CacheRequestBody
args:
bodyClass: java.lang.String # 指定缓存的请求体类型参数说明
| 参数名 | 类型 | 必填 | 说明 |
|---|---|---|---|
bodyClass | Class<?> | 是 | 指定请求体要转换成的目标类型,如 java.lang.String、byte[] 等 |
获取缓存的请求体
在下游的过滤器或处理器中,可以通过以下方式获取缓存的请求体:
kotlin
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils
import org.springframework.web.server.ServerWebExchange
class CustomGatewayFilter : GatewayFilter {
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
// 获取缓存的请求体
val cachedBody = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as? String
if (cachedBody != null) {
// 处理缓存的请求体
println("缓存的请求体内容: $cachedBody")
// 可以进行各种处理,如日志记录、验证等
if (isValidRequest(cachedBody)) {
return chain.filter(exchange)
} else {
// 返回错误响应
return createErrorResponse(exchange)
}
}
return chain.filter(exchange)
}
private fun isValidRequest(body: String): Boolean {
// 实现请求验证逻辑
return body.isNotEmpty() && body.contains("required_field")
}
private fun createErrorResponse(exchange: ServerWebExchange): Mono<Void> {
val response = exchange.response
response.statusCode = HttpStatus.BAD_REQUEST
return response.setComplete()
}
}实际业务应用示例
场景:API 请求日志记录
kotlin
import org.springframework.cloud.gateway.filter.GatewayFilterChain
import org.springframework.cloud.gateway.filter.GlobalFilter
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils
import org.springframework.core.Ordered
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import reactor.core.publisher.Mono
import java.time.LocalDateTime
@Component
class RequestLoggingFilter : GlobalFilter, Ordered {
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
// 获取缓存的请求体
val cachedBody = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as? String
// 记录请求信息
val request = exchange.request
val logEntry = RequestLogEntry(
timestamp = LocalDateTime.now(),
method = request.method.toString(),
path = request.path.value(),
headers = request.headers.toSingleValueMap(),
body = cachedBody ?: "无请求体"
)
// 异步记录日志,不影响请求处理
logRequestAsync(logEntry)
return chain.filter(exchange)
}
override fun getOrder(): Int = Ordered.LOWEST_PRECEDENCE
private fun logRequestAsync(logEntry: RequestLogEntry) {
// 实现异步日志记录逻辑
// 可以发送到消息队列、存储到数据库等
println("API请求日志: $logEntry")
}
}
data class RequestLogEntry(
val timestamp: LocalDateTime,
val method: String,
val path: String,
val headers: Map<String, String>,
val body: String
)场景:请求内容验证
kotlin
@Component
class RequestValidationFilter : GlobalFilter, Ordered {
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
val cachedBody = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as? String
return if (cachedBody != null && needsValidation(exchange.request.path.value())) {
validateRequestBody(cachedBody)
.flatMap { isValid ->
if (isValid) {
chain.filter(exchange)
} else {
createValidationErrorResponse(exchange)
}
}
} else {
chain.filter(exchange)
}
}
private fun needsValidation(path: String): Boolean {
// 确定哪些路径需要进行请求体验证
return path.startsWith("/api/secure/") || path.startsWith("/api/payment/")
}
private fun validateRequestBody(body: String): Mono<Boolean> {
return Mono.fromCallable {
try {
// 使用 JSON 库解析请求体
val jsonNode = ObjectMapper().readTree(body)
// 执行业务验证逻辑
jsonNode.has("userId") &&
jsonNode.has("timestamp") &&
jsonNode.get("userId").asText().isNotEmpty()
} catch (e: Exception) {
false
}
}
}
private fun createValidationErrorResponse(exchange: ServerWebExchange): Mono<Void> {
val response = exchange.response
response.statusCode = HttpStatus.BAD_REQUEST
val errorMessage = """
{
"error": "REQUEST_VALIDATION_FAILED",
"message": "请求体格式不正确或缺少必要字段"
}
""".trimIndent()
val buffer = response.bufferFactory().wrap(errorMessage.toByteArray())
return response.writeWith(Mono.just(buffer))
}
override fun getOrder(): Int = -1
}性能考虑
使用 `CacheRequestBody` 过滤器会增加内存使用量,特别是在处理大量请求或大体积请求体时需要特别注意。
最佳实践
合理选择缓存类型
kotlin// 对于小型 JSON 请求,使用 String .cacheRequestBody(String::class.java) // 对于二进制数据或大型请求,使用 byte[] .cacheRequestBody(ByteArray::class.java)条件性应用过滤器
kotlin.route("selective_cache_route") { r -> r.path("/api/upload/**") .and() .header("Content-Type", "application/json") .filters { f -> f.cacheRequestBody(String::class.java) } .uri("lb://file-service") }设置请求体大小限制
yamlspring: cloud: gateway: httpclient: max-initial-line-length: 4096 max-header-size: 8192 # 限制请求体大小 filter: request-size: max-in-memory-size: 1MB
错误处理
kotlin
@Component
class CacheRequestBodyErrorHandler : GlobalFilter, Ordered {
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
return chain.filter(exchange)
.doOnError { throwable ->
if (throwable is DataBufferLimitException) {
handleRequestBodyTooLarge(exchange, throwable)
}
}
.onErrorResume { throwable ->
when (throwable) {
is DataBufferLimitException -> createErrorResponse(
exchange,
HttpStatus.PAYLOAD_TOO_LARGE,
"请求体过大"
)
else -> createErrorResponse(
exchange,
HttpStatus.INTERNAL_SERVER_ERROR,
"处理请求时发生错误"
)
}
}
}
private fun handleRequestBodyTooLarge(exchange: ServerWebExchange, exception: DataBufferLimitException) {
// 记录错误日志
val request = exchange.request
println("请求体过大: ${request.path}, 来源: ${request.remoteAddress}")
}
private fun createErrorResponse(
exchange: ServerWebExchange,
status: HttpStatus,
message: String
): Mono<Void> {
val response = exchange.response
response.statusCode = status
val errorBody = """
{
"timestamp": "${LocalDateTime.now()}",
"status": ${status.value()},
"error": "${status.reasonPhrase}",
"message": "$message",
"path": "${exchange.request.path.value()}"
}
""".trimIndent()
val buffer = response.bufferFactory().wrap(errorBody.toByteArray())
response.headers.add("Content-Type", "application/json")
return response.writeWith(Mono.just(buffer))
}
override fun getOrder(): Int = Ordered.HIGHEST_PRECEDENCE
}总结
TIP
CacheRequestBody 过滤器是 Spring Cloud Gateway 中处理请求体的重要工具,它解决了请求体只能读取一次的限制,为网关层的请求处理提供了更多可能性。
关键要点
- 适用场景明确 - 主要用于需要读取请求体的场景,如日志记录、验证、转换等
- 性能影响 - 会增加内存使用,需要合理控制使用范围
- 类型灵活 - 支持多种数据类型的缓存,根据实际需求选择
- 获取便捷 - 通过
ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR键可以轻松获取缓存内容
通过合理使用 CacheRequestBody 过滤器,我们可以在不影响下游服务的前提下,在网关层实现丰富的请求处理逻辑。