浅谈Spring Batch在大型企业中的最佳实践
|
在大型企业中,由于业务复杂、数据量大、数据格式不同、数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理。而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理。这样的过程就是“批处理”。 批处理应用通常有以下特点:
什么是Spring batch Spring batch是一个轻量级的全面的批处理框架,它专为大型企业而设计,帮助开发健壮的批处理应用。Spring batch为处理大批量数据提供了很多必要的可重用的功能,比如日志追踪、事务管理、job执行统计、重启job和资源管理等。同时它也提供了优化和分片技术用于实现高性能的批处理任务。 它的核心功能包括:
笔者所在的部门属于国外某大型金融公司的CRM部门,在日常工作中我们经常需要开发一些批处理应用,对Spring Batch有着丰富的使用经验。近段时间笔者特意总结了这些经验。 使用Spring Batch 3.0以及Spring Boot 在使用Spring Batch时推荐使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它做了以下方面的提升:
支持Spring4和Java8是一个重大的提升。这样就可以使用Spring4引入的Spring boot组件,从而开发效率方面有了一个质的飞跃。引入Spring-batch框架只需要在build.gradle中加入一行代码即可:
compile("org.springframework.boot:spring-boot-starter-batch")
而增强Spring Batch Integration的功能后,我们就可以很方便的和Spring家族的其他组件集成,还可以以多种方式来调用job,也支持远程分区操作以及远程块处理。 而支持JobScope后我们可以随时为对象注入当前Job实例的上下文信息。只要我们制定Bean的scope为job scope,那么就可以随时使用jobParameters和jobExecutionContext等信息。
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>
使用Java Config而不是xml的配置方式 之前我们在配置job和step的时候都习惯用xml的配置方式,但是随着时间的推移发现问题颇多。
我们渐渐发现使用纯Java类的配置方式更灵活,它是类型安全的,而且IDE的支持更好。在构建job或step时采用的流式语法相比xml更加简洁易懂。
@Bean
public Step step(){
return stepBuilders.get("step")
.<Partner,Partner>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.listener(logSkipListener())
.build();
}
在这个例子中可以很清楚的看到该step的配置,比如reader/processor/writer组件,以及配置了哪些listener等。 本地集成测试中使用内存数据库 Spring batch在运行时需要数据库支持,因为它需要在数据库中建立一套schema来存储job和step运行的统计信息。而在本地集成测试中我们可以借助Spring batch提供的内存Repository来存储Spring batch的任务执行信息,这样即避免了在本地配置一个数据库,又可以加快job的执行。 <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean> 我们在build.gradle中加入对hsqldb的依赖: runtime(‘org.hsqldb:hsqldb:2.3.2') 然后在测试类中添加对DataSource的配置。
@EnableAutoConfiguration
@EnableBatchProcessing
@DataJpaTest
@Import({DataSourceAutoConfiguration.class,BatchAutoConfiguration.class})
public class TestConfiguration {
}
并且在applicaton.properties配置中添加初始化Database的配置: spring.batch.initializer.enable=true 合理的使用Chunk机制 Spring batch在配置Step时采用的是基于Chunk的机制。即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作。这样可以最大化的优化写入效率,整个事务也是基于Chunk来进行。 当我们在需要将数据写入到文件、数据库中之类的操作时可以适当设置Chunk的值以满足写入效率最大化。但有些场景下我们的写入操作其实是调用一个web service或者将消息发送到某个消息队列中,那么这些场景下我们就需要设置Chunk的值为1,这样既可以及时的处理写入,也不会由于整个Chunk中发生异常后,在重试时出现重复调用服务或者重复发送消息的情况。 使用Listener来监视job执行情况并及时做相应的处理 Spring batch提供了大量的Listener来对job的各个执行环节进行全面的监控。 在job层面Spring batch提供了JobExecutionListener接口,其支持在Job开始或结束时进行一些额外处理。在step层面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同时对Retry和Skip操作也提供了RetryListener及SkipListener。 通常我们会为每个job都实现一个JobExecutionListener,在afterJob操作中我们输出job的执行信息,包括执行时间、job参数、退出代码、执行的step以及每个step的详细信息。这样无论是开发、测试还是运维人员对整个job的执行情况了如指掌。 如果某个step会发生skip的操作,我们也会为其实现一个SkipListener,并在其中记录skip的数据条目,用于下一步的处理。 实现Listener有两种方式,一种是继承自相应的接口,比如继承JobExecutionListener接口,另一种是使用annoation(注解)的方式。经过实践我们认为使用注解的方式更好一些,因为使用接口你需要实现接口的所有方法,而使用注解则只需要对相应的方法添加annoation即可。 下面的这个类采用了继承接口的方式,我们看到其实我们只用到了第一个方法,第二个和第三个都没有用到。但是我们必须提供一个空的实现。
public class CustomSkipListener implements SkipListener<String,String> {
@Override
public void onSkipInRead(Throwable t) {
// business logic
}
@Override
public void onSkipInWrite(String item,Throwable t) {
// no need
}
@Override
public void onSkipInProcess(String item,Throwable t) {
// no need
}
}
而使用annoation的方式可以简写为:
public class CustomSkipListener {
@OnSkipInRead
public void onSkipInRead(Throwable t) {
// business logic
}
}
使用Retry和Skip增强批处理工作的健壮性 在处理百万级的数据过程过程中难免会出现异常。如果一旦出现异常而导致整个批处理工作终止的话那么会导致后续的数据无法被处理。Spring Batch内置了Retry(重试)和Skip(跳过)机制帮助我们轻松处理各种异常。适合Retry的异常的特点是这些异常可能会随着时间推移而消失,比如数据库目前有锁无法写入、web服务当前不可用、web服务满载等。所以对这些异常我们可以配置Retry机制。而有些异常则不应该配置Retry,比如解析文件出现异常等,因为这些异常即使Retry也会始终失败。 即使Retry多次仍然失败也无需让整个step失败,可以对指定的异常设置Skip选项从而保证后续的数据能够被继续处理。我们也可以配置SkipLimit选项保证当Skip的数据条目达到一定数量后及时终止整个Job。 有时候我们需要在每次Retry中间隔做一些操作,比如延长Retry时间,恢复操作现场等,Spring Batch提供了BackOffPolicy来达到目的。下面是一个配置了Retry机制、Skip机制以及BackOffPolicy的step示例。
@Bean
public Step step(){
return stepBuilders.get("step")
.<Partner,Partner>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.retryLimit(5)
.retry(ServiceUnavailableException.class)
.backOffPolicy(backoffPolicy)
.listener(logSkipListener())
.build();
}
使用自定义的Decider来实现Job flow (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- Spring Boot利用@Async异步调用:ThreadPoolTaskScheduler线
- 在任务栏图标上使用java点击左键打开弹出窗口(菜单)
- java – Windows批处理文件多次运行jar文件
- Springboot 使用 JSR 303 对 Controller 控制层校验及 Serv
- java – Swing – MaskFormatter – 从文本字段的右侧输入数
- Spring自定义配置Schema可扩展(一)
- 在java中单独的逻辑和GUI
- Spring Boot中使用 Spring Security 构建权限系统的示例代码
- java – 如何按插入顺序迭代Multimap?
- java – 如何在Outlook中创建电子邮件并使其可见的用户
