Appearance
🚀 Job-Launching Gateway 可用属性详解
Spring Batch Integration 的消息驱动任务启动网关,让你通过消息触发批处理任务。以下是控制网关行为的核心属性配置指南:
🔑 1. id 属性
作用:标识网关的 Spring Bean
实现类型:
EventDrivenConsumer(输入通道为SubscribableChannel时)PollingConsumer(输入通道为PollableChannel时)
通道类型选择
✅ 实时响应场景 → 用 SubscribableChannel (如 DirectChannel)
✅ 异步轮询场景 → 用 PollableChannel (如 QueueChannel)
⚙️ 2. auto-startup 属性
作用:控制网关是否随应用启动
默认值:true(自动启动)
kotlin
@Bean
fun jobLaunchingGateway(): IntegrationFlow {
return IntegrationFlow.from("requestChannel")
.handle(JobLaunchingGateway(jobLauncher())) {
it.autoStartup(false) // 手动控制启动
}
}📨 3. request-channel 属性
作用:定义接收启动消息的输入通道
kotlin
@Bean
fun jobFlow() = IntegrationFlow.from(MessageChannels.direct("jobRequest"))
.handle(JobLaunchingGateway(jobLauncher()))
.channel("replyChannel")
.get()NOTE
消息格式要求:
- 消息体必须是
JobLaunchRequest对象 - 包含
Job实例和JobParameters
🔙 4. reply-channel 属性
作用:定义返回任务执行结果(JobExecution)的通道
kotlin
@Bean
fun resultFlow() = IntegrationFlow.from("replyChannel")
.transform { jobExecution: JobExecution ->
"任务状态: ${jobExecution.status}"
}
.get()⏱️ 5. reply-timeout 属性
作用:设置等待结果返回的超时时间(毫秒)
默认值:-1(无限等待)
阻塞风险警告
使用有界队列时(如 QueueChannel 容量满),超时设置可防止线程死锁!
kotlin
.handle(JobLaunchingGateway(jobLauncher())) {
it.replyTimeout(5000) // 5秒超时
}🚦 6. job-launcher 属性
作用:自定义任务启动器(覆盖默认 jobLauncher Bean)
kotlin
@Bean
fun customLauncher(): JobLauncher {
val launcher = SimpleJobLauncher()
launcher.setTaskExecutor(SimpleAsyncTaskExecutor()) // 异步执行
return launcher
}
@Bean
fun gateway() = JobLaunchingGateway(customLauncher()) // 注入自定义启动器WARNING
未显式配置且无默认 jobLauncher Bean 时 → 抛出 BeanCreationException
🔢 7. order 属性
作用:当网关订阅 SubscribableChannel 时,指定消费顺序
kotlin
.handle(JobLaunchingGateway(jobLauncher())) {
it.order(3) // 优先级数值越小越先执行
}🔄 消息驱动任务启动流程
通过 Mermaid 时序图理解完整工作流:
💡 最佳实践总结
- 通道选择:kotlin
// 实时处理 MessageChannels.direct("requestChannel") // 异步缓冲 MessageChannels.queue(100, "requestChannel") - 超时设置:生产环境务必配置
reply-timeout避免阻塞 - 错误处理:添加错误通道捕获异常kotlin
.handle(JobLaunchingGateway(jobLauncher())) { it.errorChannel("errorChannel") }
TIP
完整配置示例:
查看 Kotlin DSL 配置
kotlin
@Configuration
class BatchIntegrationConfig {
@Bean
fun jobFlow(job: Job) = IntegrationFlow.from("jobRequests")
.handle(JobLaunchingGateway(jobLauncher())) {
it.replyTimeout(3000)
.autoStartup(true)
}
.channel("jobResults")
.get()
@Bean
fun jobLauncher() = SimpleJobLauncher().apply {
setJobRepository(jobRepository)
setTaskExecutor(SimpleAsyncTaskExecutor())
}
}通过合理配置这些属性,可实现灵活可靠的消息驱动批处理任务调度!⚡️