Skip to content

ModifyRequestBody GatewayFilter Factory

概述

ModifyRequestBody 用于在网关将请求转发到下游服务之前修改请求体内容。这个过滤器在微服务架构中扮演着重要的数据转换和预处理角色。

业务场景和解决的问题

常见应用场景

  1. 数据格式转换:将客户端发送的数据格式转换为下游服务期望的格式
  2. 数据清洗和验证:在数据到达业务服务前进行数据清洗和初步验证
  3. 敏感信息脱敏:在转发前对敏感数据进行脱敏处理
  4. 数据增强:为请求体添加额外的业务信息或元数据
  5. 协议适配:适配不同版本 API 之间的数据结构差异

解决的核心问题

  • 解耦数据转换逻辑:避免在每个微服务中重复实现相同的数据转换逻辑
  • 统一数据处理入口:在网关层统一处理数据转换,简化下游服务的复杂度
  • 提高系统灵活性:通过配置化的方式动态调整数据转换规则

工作原理

重要特性

IMPORTANT

ModifyRequestBody 过滤器只能通过 Java DSL 配置,不支持通过配置文件进行配置。这是因为过滤器需要自定义的转换逻辑,无法通过简单的配置参数实现。

当请求没有请求体时,过滤器会接收到 `null` 值。如果需要为空请求分配请求体,应该返回 `Mono.empty()`。

配置方式

基本配置结构

kotlin
@Configuration
class GatewayConfig {

    @Bean
    fun routes(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("modify-request-body") { r ->
                r.host("*.example.com")
                    .filters { f ->
                        f.prefixPath("/api")
                            .modifyRequestBody(
                                String::class.java,           // 输入类型
                                UserRequest::class.java,      // 输出类型
                                MediaType.APPLICATION_JSON_VALUE // 内容类型
                            ) { exchange, originalBody ->
                                // 转换逻辑
                                processRequestBody(originalBody)
                            }
                    }
                    .uri("http://localhost:8080")
            }
            .build()
    }

    /**
     * 处理请求体转换逻辑
     * @param originalBody 原始请求体
     * @return 转换后的请求体
     */
    private fun processRequestBody(originalBody: String?): Mono<UserRequest> {
        return if (originalBody != null) {
            try {
                // 解析原始JSON并转换为目标格式
                val jsonNode = ObjectMapper().readTree(originalBody)
                val userRequest = UserRequest(
                    name = jsonNode.get("userName")?.asText()?.uppercase() ?: "",
                    email = jsonNode.get("userEmail")?.asText() ?: "",
                    timestamp = System.currentTimeMillis()
                )
                Mono.just(userRequest)
            } catch (e: Exception) {
                // 处理转换异常
                Mono.error(e)
            }
        } else {
            // 处理空请求体的情况
            Mono.empty()
        }
    }
}

/**
 * 用户请求数据类
 */
data class UserRequest(
    val name: String,
    val email: String,
    val timestamp: Long
)
java
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("rewrite_request_obj", r -> r.host("*.rewriterequestobj.org")
            .filters(f -> f.prefixPath("/httpbin")
                .modifyRequestBody(String.class, Hello.class, MediaType.APPLICATION_JSON_VALUE,
                    (exchange, s) -> Mono.just(new Hello(s.toUpperCase()))))
            .uri(uri))
        .build();
}

static class Hello {
    String message;

    public Hello() { }

    public Hello(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

实际业务场景示例

场景 1:用户注册信息标准化

假设我们有一个用户注册服务,需要将前端发送的用户信息进行标准化处理:

kotlin
@Configuration
class UserRegistrationGatewayConfig {

    @Bean
    fun userRegistrationRoute(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("user-registration") { r ->
                r.path("/register")
                    .filters { f ->
                        f.modifyRequestBody(
                            String::class.java,
                            StandardUserRequest::class.java,
                            MediaType.APPLICATION_JSON_VALUE
                        ) { exchange, rawBody ->
                            standardizeUserRegistration(rawBody)
                        }
                    }
                    .uri("http://user-service:8080")
            }
            .build()
    }

    /**
     * 标准化用户注册信息
     * - 邮箱转小写
     * - 电话号码格式化
     * - 添加注册时间戳
     * - 生成用户ID
     */
    private fun standardizeUserRegistration(rawBody: String?): Mono<StandardUserRequest> {
        return if (rawBody != null) {
            try {
                val mapper = ObjectMapper()
                val rawRequest = mapper.readTree(rawBody)

                val standardRequest = StandardUserRequest(
                    userId = UUID.randomUUID().toString(), // 生成唯一用户ID
                    username = rawRequest.get("username")?.asText()?.trim() ?: "",
                    email = rawRequest.get("email")?.asText()?.lowercase()?.trim() ?: "", // 邮箱标准化
                    phone = formatPhoneNumber(rawRequest.get("phone")?.asText()), // 电话格式化
                    registrationTime = Instant.now().epochSecond, // 添加注册时间
                    source = "web", // 标记来源
                    verified = false // 初始未验证状态
                )

                Mono.just(standardRequest)
            } catch (e: Exception) {
                log.error("用户注册信息标准化失败", e)
                Mono.error(IllegalArgumentException("无效的用户注册信息"))
            }
        } else {
            Mono.error(IllegalArgumentException("注册信息不能为空"))
        }
    }

    /**
     * 格式化电话号码
     */
    private fun formatPhoneNumber(phone: String?): String {
        return phone?.replace(Regex("[^0-9]"), "") // 移除非数字字符
            ?.let { cleanPhone ->
                when {
                    cleanPhone.startsWith("86") && cleanPhone.length == 13 -> cleanPhone
                    cleanPhone.length == 11 -> "86$cleanPhone"
                    else -> cleanPhone
                }
            } ?: ""
    }

    companion object {
        private val log = LoggerFactory.getLogger(UserRegistrationGatewayConfig::class.java)
    }
}

/**
 * 标准化用户请求数据类
 */
data class StandardUserRequest(
    val userId: String,           // 系统生成的用户ID
    val username: String,         // 用户名
    val email: String,           // 标准化后的邮箱
    val phone: String,           // 格式化后的电话
    val registrationTime: Long,   // 注册时间戳
    val source: String,          // 注册来源
    val verified: Boolean        // 验证状态
)

场景 2:API 版本兼容性处理

处理不同版本 API 之间的数据结构差异:

kotlin
@Configuration
class ApiVersionCompatibilityConfig {

    @Bean
    fun apiVersionRoute(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("api-v1-to-v2") { r ->
                r.path("/api/v1/orders/**")
                    .filters { f ->
                        f.modifyRequestBody(
                            String::class.java,
                            String::class.java,
                            MediaType.APPLICATION_JSON_VALUE
                        ) { exchange, v1Body ->
                            convertV1ToV2Format(v1Body)
                        }
                        .rewritePath("/api/v1/orders/(?<segment>.*)", "/api/v2/orders/\${segment}")
                    }
                    .uri("http://order-service-v2:8080")
            }
            .build()
    }

    /**
     * 将V1 API格式转换为V2 API格式
     * V1: { "customer_id": "123", "items": [...] }
     * V2: { "customerId": "123", "orderItems": [...], "apiVersion": "v2" }
     */
    private fun convertV1ToV2Format(v1Body: String?): Mono<String> {
        return if (v1Body != null) {
            try {
                val mapper = ObjectMapper()
                val v1Data = mapper.readTree(v1Body)

                // 构建V2格式的数据
                val v2Data = mapper.createObjectNode().apply {
                    // 字段名转换:snake_case -> camelCase
                    put("customerId", v1Data.get("customer_id")?.asText())
                    put("customerName", v1Data.get("customer_name")?.asText())

                    // 数组字段重命名
                    set<JsonNode>("orderItems", v1Data.get("items"))

                    // 添加V2特有字段
                    put("apiVersion", "v2")
                    put("convertedFromV1", true)
                    put("conversionTime", System.currentTimeMillis())

                    // 保留其他兼容字段
                    v1Data.fieldNames().forEach { fieldName ->
                        if (!fieldName.startsWith("customer_") && fieldName != "items") {
                            set<JsonNode>(fieldName, v1Data.get(fieldName))
                        }
                    }
                }

                Mono.just(mapper.writeValueAsString(v2Data))
            } catch (e: Exception) {
                log.error("API版本转换失败", e)
                Mono.error(IllegalArgumentException("无法转换API版本"))
            }
        } else {
            Mono.empty()
        }
    }

    companion object {
        private val log = LoggerFactory.getLogger(ApiVersionCompatibilityConfig::class.java)
    }
}

场景 3:敏感信息脱敏处理

在转发用户数据前进行敏感信息脱敏:

kotlin
@Configuration
class DataMaskingConfig {

    @Bean
    fun dataMaskingRoute(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("sensitive-data-masking") { r ->
                r.path("/api/user/profile")
                    .filters { f ->
                        f.modifyRequestBody(
                            String::class.java,
                            String::class.java,
                            MediaType.APPLICATION_JSON_VALUE
                        ) { exchange, originalBody ->
                            maskSensitiveData(originalBody)
                        }
                    }
                    .uri("http://profile-service:8080")
            }
            .build()
    }

    /**
     * 敏感数据脱敏处理
     */
    private fun maskSensitiveData(originalBody: String?): Mono<String> {
        return if (originalBody != null) {
            try {
                val mapper = ObjectMapper()
                val data = mapper.readTree(originalBody) as ObjectNode

                // 手机号脱敏:保留前3位和后4位
                data.get("phone")?.asText()?.let { phone ->
                    data.put("phone", maskPhone(phone))
                }

                // 身份证号脱敏:保留前6位和后4位
                data.get("idCard")?.asText()?.let { idCard ->
                    data.put("idCard", maskIdCard(idCard))
                }

                // 邮箱脱敏:保留@前的前2位和@后的域名
                data.get("email")?.asText()?.let { email ->
                    data.put("email", maskEmail(email))
                }

                // 银行卡号脱敏:只显示后4位
                data.get("bankCard")?.asText()?.let { bankCard ->
                    data.put("bankCard", maskBankCard(bankCard))
                }

                // 添加脱敏标记
                data.put("_masked", true)
                data.put("_maskingTime", System.currentTimeMillis())

                Mono.just(mapper.writeValueAsString(data))
            } catch (e: Exception) {
                log.error("敏感数据脱敏失败", e)
                Mono.error(e)
            }
        } else {
            Mono.empty()
        }
    }

    /**
     * 手机号脱敏:138****1234
     */
    private fun maskPhone(phone: String): String {
        return if (phone.length >= 7) {
            phone.substring(0, 3) + "****" + phone.substring(phone.length - 4)
        } else phone
    }

    /**
     * 身份证号脱敏:123456********1234
     */
    private fun maskIdCard(idCard: String): String {
        return if (idCard.length >= 10) {
            idCard.substring(0, 6) + "********" + idCard.substring(idCard.length - 4)
        } else idCard
    }

    /**
     * 邮箱脱敏:ab***@example.com
     */
    private fun maskEmail(email: String): String {
        val atIndex = email.indexOf("@")
        return if (atIndex > 2) {
            email.substring(0, 2) + "***" + email.substring(atIndex)
        } else email
    }

    /**
     * 银行卡号脱敏:**** **** **** 1234
     */
    private fun maskBankCard(bankCard: String): String {
        return if (bankCard.length >= 4) {
            "**** **** **** " + bankCard.substring(bankCard.length - 4)
        } else bankCard
    }

    companion object {
        private val log = LoggerFactory.getLogger(DataMaskingConfig::class.java)
    }
}

错误处理和最佳实践

异常处理策略

kotlin
@Configuration
class RobustRequestModificationConfig {

    @Bean
    fun robustRoute(builder: RouteLocatorBuilder): RouteLocator {
        return builder.routes()
            .route("robust-modification") { r ->
                r.path("/api/data")
                    .filters { f ->
                        f.modifyRequestBody(
                            String::class.java,
                            String::class.java,
                            MediaType.APPLICATION_JSON_VALUE
                        ) { exchange, body ->
                            safelyModifyRequestBody(exchange, body)
                        }
                    }
                    .uri("http://downstream-service:8080")
            }
            .build()
    }

    /**
     * 安全的请求体修改处理
     */
    private fun safelyModifyRequestBody(
        exchange: ServerWebExchange,
        body: String?
    ): Mono<String> {
        return try {
            when {
                body.isNullOrBlank() -> handleEmptyBody()
                !isValidJson(body) -> handleInvalidJson(body)
                else -> processValidBody(body)
            }
        } catch (e: Exception) {
            handleProcessingError(e, exchange)
        }
    }

    /**
     * 处理空请求体
     */
    private fun handleEmptyBody(): Mono<String> {
        log.warn("收到空请求体,使用默认值")
        return Mono.just("""{"status": "empty", "timestamp": ${System.currentTimeMillis()}}""")
    }

    /**
     * 处理无效JSON
     */
    private fun handleInvalidJson(body: String): Mono<String> {
        log.warn("收到无效JSON格式的请求体: {}", body.take(100))
        // 尝试包装成有效JSON
        return Mono.just("""{"originalData": "$body", "wrapped": true}""")
    }

    /**
     * 处理有效的请求体
     */
    private fun processValidBody(body: String): Mono<String> {
        return Mono.fromCallable {
            val mapper = ObjectMapper()
            val data = mapper.readTree(body) as ObjectNode

            // 添加处理时间戳
            data.put("processedAt", System.currentTimeMillis())
            data.put("processedBy", "gateway")

            mapper.writeValueAsString(data)
        }
        .subscribeOn(Schedulers.boundedElastic()) // 使用非阻塞线程池
        .timeout(Duration.ofSeconds(5)) // 设置超时时间
        .onErrorResume { error ->
            log.error("处理请求体时发生错误", error)
            Mono.just(body) // 发生错误时返回原始请求体
        }
    }

    /**
     * 处理处理过程中的错误
     */
    private fun handleProcessingError(
        error: Exception,
        exchange: ServerWebExchange
    ): Mono<String> {
        log.error("请求体修改过程中发生异常", error)

        // 记录请求信息用于调试
        val requestPath = exchange.request.path.value()
        val requestId = exchange.request.headers.getFirst("X-Request-ID") ?: "unknown"

        log.error("请求路径: {}, 请求ID: {}", requestPath, requestId)

        // 根据错误类型决定处理策略
        return when (error) {
            is JsonProcessingException -> {
                // JSON处理错误,返回错误响应
                Mono.error(IllegalArgumentException("JSON格式错误: ${error.message}"))
            }
            is TimeoutException -> {
                // 超时错误
                Mono.error(IllegalStateException("请求处理超时"))
            }
            else -> {
                // 其他错误,返回通用错误
                Mono.error(RuntimeException("请求处理失败: ${error.message}"))
            }
        }
    }

    /**
     * 验证JSON格式
     */
    private fun isValidJson(json: String): Boolean {
        return try {
            ObjectMapper().readTree(json)
            true
        } catch (e: JsonProcessingException) {
            false
        }
    }

    companion object {
        private val log = LoggerFactory.getLogger(RobustRequestModificationConfig::class.java)
    }
}

性能优化建议

以下是一些性能优化的最佳实践:

1. 异步处理

kotlin
/**
 * 使用异步方式处理大量数据转换
 */
private fun asyncProcessLargeData(data: String): Mono<String> {
    return Mono.fromCallable {
        // 复杂的数据转换逻辑
        performComplexTransformation(data)
    }
    .subscribeOn(Schedulers.boundedElastic()) // 使用专门的线程池
    .timeout(Duration.ofSeconds(10)) // 设置合理的超时时间
}

2. 缓存机制

kotlin
@Component
class RequestTransformationCache {

    private val cache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(Duration.ofMinutes(10))
        .build<String, String>()

    fun getCachedTransformation(key: String, transformer: () -> String): String {
        return cache.get(key) { transformer() }
    }
}

3. 批量处理

kotlin
/**
 * 批量处理多个请求项
 */
private fun processBatchItems(items: List<Any>): Mono<List<Any>> {
    return Flux.fromIterable(items)
        .buffer(10) // 每批处理10个项目
        .flatMap { batch ->
            Mono.fromCallable { processBatch(batch) }
                .subscribeOn(Schedulers.parallel())
        }
        .collectList()
}

监控和日志

请求处理监控

kotlin
@Component
class RequestModificationMetrics {

    private val processingTimeTimer = Timer.builder("gateway.request.modification.time")
        .description("请求体修改处理时间")
        .register(Metrics.globalRegistry)

    private val processingCounter = Counter.builder("gateway.request.modification.count")
        .description("请求体修改次数")
        .register(Metrics.globalRegistry)

    fun recordProcessingTime(duration: Duration) {
        processingTimeTimer.record(duration)
    }

    fun incrementProcessingCount(status: String) {
        processingCounter.increment(Tags.of("status", status))
    }
}

详细日志记录

kotlin
/**
 * 带有详细日志的请求体修改
 */
private fun modifyWithLogging(exchange: ServerWebExchange, body: String?): Mono<String> {
    val requestId = exchange.request.headers.getFirst("X-Request-ID") ?: UUID.randomUUID().toString()
    val startTime = System.currentTimeMillis()

    log.info("开始处理请求体修改, RequestID: {}, ContentLength: {}",
        requestId, body?.length ?: 0)

    return processRequestBody(body)
        .doOnSuccess { result ->
            val duration = System.currentTimeMillis() - startTime
            log.info("请求体修改完成, RequestID: {}, Duration: {}ms, ResultLength: {}",
                requestId, duration, result?.length ?: 0)
        }
        .doOnError { error ->
            val duration = System.currentTimeMillis() - startTime
            log.error("请求体修改失败, RequestID: {}, Duration: {}ms, Error: {}",
                requestId, duration, error.message)
        }
}

注意事项

使用 `ModifyRequestBody` 过滤器时需要注意以下几点:

  1. 内存消耗:过滤器需要将整个请求体加载到内存中,对于大文件上传场景需要特别注意内存使用
  2. 性能影响:请求体的序列化和反序列化会增加延迟,需要合理评估性能影响
  3. 错误处理:必须妥善处理转换过程中可能出现的异常,避免导致整个请求失败
  4. 类型安全:确保输入和输出类型的正确性,避免运行时类型转换异常

在生产环境中使用时,建议:

  • 设置合适的超时时间
  • 实施熔断机制
  • 监控内存使用情况
  • 记录详细的操作日志

总结

ModifyRequestBody 过滤器是 Spring Cloud Gateway 中一个功能强大的工具,它能够在网关层统一处理请求数据的转换和预处理。通过合理使用这个过滤器,我们可以:

  • 简化微服务架构:将数据转换逻辑集中在网关层
  • 提高系统灵活性:支持动态的数据格式转换
  • 增强数据安全性:在数据进入内部服务前进行脱敏和验证
  • 改善用户体验:提供更好的 API 兼容性和数据标准化

在实际使用中,需要根据具体的业务场景选择合适的转换策略,并注意性能和错误处理方面的最佳实践。

Details

相关参考资料