一篇文章全面解析大数据批处理框架Spring Batch

  • 来源:网络大数据

实现作业的健壮性与扩展性

批处理要求Job必须有较强的健壮性,通常Job是批量处理数据、无人值守的,这要求在Job执行期间能够应对各种发生的异常、错误,并对Job执行进行有效的跟踪。

一个健壮的Job通常需要具备如下的几个特性:

1. 容错性

在Job执行期间非致命的异常,Job执行框架应能够进行有效的容错处理,而不是让整个Job执行失败;通常只有致命的、导致业务不正确的异常才可以终止Job的执行。

2. 可追踪性

Job执行期间任何发生错误的地方都需要进行有效的记录,方便后期对错误点进行有效的处理。例如在Job执行期间任何被忽略处理的记录行需要被有效的记录下来,应用程序维护人员可以针对被忽略的记录后续做有效的处理。

3. 可重启性

Job执行期间如果因为异常导致失败,应该能够在失败的点重新启动Job;而不是从头开始重新执行Job。

框架提供了支持上面所有能力的特性,包括Skip(跳过记录处理)、Retry(重试给定的操作)、Restart(从错误点开始重新启动失败的Job):

  • Skip,在对数据处理期间,如果数据的某几条的格式不能满足要求,可以通过Skip跳过该行记录的处理,让Processor能够顺利的处理其余的记录行。
  • Retry,将给定的操作进行多次重试,在某些情况下操作因为短暂的异常导致执行失败,如网络连接异常、并发处理异常等,可以通过重试的方式避免单次的失败,下次执行操作时候网络恢复正常,不再有并发的异常,这样通过重试的能力可以有效的避免这类短暂的异常。
  • Restart,在Job执行失败后,可以通过重启功能来继续完成Job的执行。在重启时候,批处理框架允许在上次执行失败的点重新启动Job,而不是从头开始执行,这样可以大幅提高Job执行的效率。

对于扩展性,框架提供的扩展能力包括如下的四种模式 :

  • Multithreaded Step 多线程执行一个Step;
  • Parallel Step 通过多线程并行执行多个Step;
  • Remote Chunking 在远端节点上执行分布式Chunk操作;
  • Partitioning Step 对数据进行分区,并分开执行;

我们先来看第一种的实现Multithreaded Step:

批处理框架在Job执行时默认使用单个线程完成任务的执行,同时框架提供了线程池的支持(Multithreaded Step模式),可以在Step执行时候进行并行处理,这里的并行是指同一个Step使用线程池进行执行,同一个Step被并行的执行。使用tasklet的属性task-executor可以非常容易的将普通的Step变成多线程Step。

Multithreaded Step的实现示例:

需要注意的是Spring Batch框架提供的大部分的ItemReader、ItemWriter等操作都是线程不安全的。

可以通过扩展的方式显现线程安全的Step。

下面为大家展示一个扩展的实现:

需求:针对数据表的批量处理,实现线程安全的Step,并且支持重启能力,即在执行失败点可以记录批处理的状态。

对于示例中的数据库读取组件JdbcCursorItemReader,在设计数据库表时,在表中增加一个字段Flag,用于标识当前的记录是否已经读取并处理成功,如果处理成功则标识Flag=true,等下次重新读取的时候,对于已经成功读取且处理成功的记录直接跳过处理。

Multithreaded Step(多线程步)提供了多个线程执行一个Step的能力,但这种场景在实际的业务中使用的并不是非常多。

更多的业务场景是Job中不同的Step没有明确的先后顺序,可以在执行期并行的执行。

Parallel Step:提供单个节点横向扩展的能力

使用场景:Step A、Step B两个作业步由不同的线程执行,两者均执行完毕后,Step C才会被执行。

框架提供了并行Step的能力。可以通过Split元素来定义并行的作业流,并制定使用的线程池。

Parallel Step模式的执行效果如下:

每个作业步并行处理不同的记录,示例中三个作业步,处理同一张表中的不同数据。

并行Step提供了在一个节点上横向处理,但随着作业处理量的增加,有可能一台节点无法满足Job的处理,此时我们可以采用远程Step的方式将多个机器节点组合起来完成一个Job的处理。

Remote Chunking:远程Step技术本质上是将对Item读、写的处理逻辑进行分离;通常情况下读的逻辑放在一个节点进行操作,将写操作分发到另外的节点执行。

远程分块是一个把step进行技术分割的工作,不需要对处理数据的结构有明确了解。

任何输入源能够使用单进程读取并在动态分割后作为"块"发送给远程的工作进程。

远程进程实现了监听者模式,反馈请求、处理数据最终将处理结果异步返回。请求和返回之间的传输会被确保在发送者和单个消费者之间。

在Master节点,作业步负责读取数据,并将读取的数据通过远程技术发送到指定的远端节点上,进行处理,处理完毕后Master负责回收Remote端执行的情况。

在Spring Batch框架中通过两个核心的接口来完成远程Step的任务,分别是ChunkProvider与ChunkProcessor。

ChunkProvider:根据给定的ItemReader操作产生批量的Chunk操作;

ChunkProcessor:负责获取ChunkProvider产生的Chunk操作,执行具体的写逻辑;

Spring Batch中对远程Step没有默认的实现,但我们可以借助SI或者AMQP实现来实现远程通讯能力。

基于SI实现Remote Chunking模式的示例:

Step本地节点负责读取数据,并通过MessagingGateway将请求发送到远程Step上;远程Step提供了队列的监听器,当请求队列中有消息时候获取请求信息并交给ChunkHander负责处理。

接下来我们看下最后一种分区模式;Partitioning Step:分区模式需要对数据的结构有一定的了解,如主键的范围、待处理的文件的名字等。

这种模式的优点在于分区中每一个元素的处理器都能够像一个普通Spring Batch任务的单步一样运行,也不必去实现任何特殊的或是新的模式,来让他们能够更容易配置与测试。

通过分区可以实现以下的优点:

  • 分区实现了更细粒度的扩展;
  • 基于分区可以实现高性能的数据切分;
  • 分区比远程通常具有更高的扩展性;
  • 分区后的处理逻辑,支持本地与远程两种模式;
  • 分区作业典型的可以分成两个处理阶段,数据分区、分区处理;

数据分区:根据特殊的规则(例如:根据文件名称,数据的唯一性标识,或者哈希算法)将数据进行合理的数据切片,为不同的切片生成数据执行上下文Execution Context、作业步执行器Step Execution。可以通过接口Partitioner生成自定义的分区逻辑,Spring Batch批处理框架默认实现了对多文件的实现org.springframework.batch.core.partition.support.MultiResourcePartitioner;也可以自行扩展接口Partitioner来实现自定义的分区逻辑。

分区处理:通过数据分区后,不同的数据已经被分配到不同的作业步执行器中,接下来需要交给分区处理器进行作业,分区处理器可以本地执行也可以远程执行被划分的作业。接口PartitionHandler定义了分区处理的逻辑,Spring Batch批处理框架默认实现了本地多线程的分区处理org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;也可以自行扩展接口PartitionHandler来实现自定义的分区处理逻辑。

Spring Batch框架提供了对文件分区的支持,实现类org.springframework.batch.core.partition.support.MultiResourcePartitioner提供了对文件分区的默认支持,根据文件名将不同的文件处理进行分区,提升处理的速度和效率,适合有大量小文件需要处理的场景。

示例展示了将不同文件分配到不同的作业步中,使用MultiResourcePartitioner进行分区,意味着每个文件会被分配到一个不同的分区中。如果有其它的分区规则,可以通过实现接口Partitioner来进行自定义的扩展。有兴趣的TX,可以自己实现基于数据库的分区能力哦。

总结一下,批处理框架在扩展性上提供了4中不同能力,每种都是各自的使用场景,我们可以根据实际的业务需要进行选择。

批处理框架的不足与增强

Spring Batch批处理框架虽然提供了4种不同的监控方式,但从目前的使用情况来看,都不是非常的友好。

  • 通过DB直接查看,对于管理人员来讲,真的不忍直视;

  • 通过API实现自定义的查询,这是程序员的天堂,确实运维人员的地狱;

  • 提供了Web控制台,进行Job的监控和操作,目前提供的功能太裸露,无法直接用于生产;

  • 提供JMX查询方式,对于非开发人员太不友好;

但在企业级应用中面对批量数据处理,仅仅提供批处理框架仅能满足批处理作业的快速开发、执行能力。

企业需要统一的批处理平台来处理复杂的企业批处理应用,批处理平台需要解决作业的统一调度、批处理作业的集中管理和管控、批处理作业的统一监控等能力。

那完美的解决方案是什么呢?

企业级批处理平台需要在Spring Batch批处理框架的基础上,集成调度框架,通过调度框架可以将任务按照企业的需求进行任务的定期执行;

丰富目前Spring Batch Admin(Spring Batch的管理监控平台,目前能力比较薄弱)框架,提供对Job的统一管理功能,增强Job作业的监控、预警等能力;

通过与企业的组织机构、权限管理、认证系统进行合理的集成,增强平台对Job作业的权限控制、安全管理能力。

由于时间关系,今天的分享就到这里,很多内容未能展开讨论。欢迎大家在实际业务中使用Spring Batch框架。

作者:刘相