介绍
在实际开发中,数据的批量处理是非常常见的需求,例如数据的导入、导出、转换等。Spring Batch 是一个强大的批处理框架,可以轻松处理这些任务。本文将介绍如何使用 Spring Boot 整合 Spring Batch,实现数据批量处理的功能。
准备工作
在开始之前,需要确保你已经安装了以下软件:
- JDK(版本 1.8 或以上)
- Maven
- IDE(推荐使用 IntelliJ IDEA 或 Eclipse)
步骤一:创建 Spring Boot 项目
首先,我们需要创建一个 Spring Boot 项目。打开你的 IDE,选择创建一个新的 Spring Boot 项目。在创建项目的过程中,你需要选择 Spring Batch 作为一个依赖。
步骤二:配置 Spring Batch
在创建完项目后,我们需要配置 Spring Batch 的相关内容。首先,在配置文件(application.properties 或 application.yml)中添加以下配置:
# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/batch_demo?useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# Spring Batch 配置
spring.batch.initializer.enabled=true
spring.batch.job.enabled=true
spring.batch.job.names=CsvToDatabaseJob
此外,我们需要创建一个数据库表来存储 Spring Batch 的元数据。你可以使用以下 SQL 在数据库中创建表:
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10),
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10),
COMMIT_COUNT INT ,
READ_COUNT INT ,
FILTER_COUNT INT ,
WRITE_COUNT INT ,
READ_SKIP_COUNT INT ,
WRITE_SKIP_COUNT INT ,
PROCESS_SKIP_COUNT INT ,
ROLLBACK_COUNT INT,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
FOREIGN KEY (JOB_EXECUTION_ID)
REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
UNIQUE_KEY CHAR(1) NOT NULL ,
ID BIGINT NOT NULL ,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('X', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('A', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('B', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('C', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('D', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('E', 0);
步骤三:编写 Spring Batch 作业
现在,我们可以开始编写我们的第一个 Spring Batch 作业了。首先,创建一个类,并添加 @Configuration
和 @EnableBatchProcessing
注解。然后,按照以下步骤编写作业:
- 创建一个 ItemReader,用于从输入源读取数据。你可以选择从数据库、文件等源读取数据。
- 创建一个 ItemProcessor,用于对读取的数据进行处理或转换。你可以根据需求自定义处理逻辑。
- 创建一个 ItemWriter,用于将处理后的数据写入目标数据库、文件等。
- 创建一个 Job,将 ItemReader、ItemProcessor 和 ItemWriter 组合在一起,并定义作业的执行流程。
以下是一个简单的作业示例:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public ItemReader<Person> reader() {
// 创建一个 ItemReader
// 从数据库或文件中读取数据
}
@Bean
public ItemProcessor<Person, Person> processor() {
// 创建一个 ItemProcessor
// 对读取的数据进行处理或转换
}
@Bean
public ItemWriter<Person> writer() {
// 创建一个 ItemWriter
// 将处理后的数据写入目标数据库或文件
}
@Bean
public Step step1(ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step)
.end()
.build();
}
}
在上面的示例中,我们定义了一个由三个步骤组成的作业。Step 1 是整个作业的第一个步骤,它包含了自定义的 ItemReader、ItemProcessor 和 ItemWriter。
步骤四:运行 Spring Batch 作业
完成了作业的编写之后,我们可以运行它了。你可以创建一个 RESTful 接口或命令行等方式来启动作业。
如果你选择使用命令行来运行作业,你需要在主类中添加以下代码:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
jobLauncher.run(importUserJob, jobParameters);
}
}
以上代码中,我们创建了一个 CommandLineRunner 的实现类,并注入了 JobLauncher 和 Job 实例。在 run 方法中,我们创建了一个 JobParameters 对象,并以当前的日期作为参数运行作业。
总结
通过本文的实例,我们了解了如何使用 Spring Boot 整合 Spring Batch。Spring Batch 提供了强大的批处理功能,可以轻松处理数据的批量处理任务。希望本文能帮助你快速入门 Spring Boot 和 Spring Batch,并在实际开发中使用它们。
本文来自极简博客,作者:冰山美人,转载请注明原文链接:SpringBoot入门实战:SpringBoot整合Spring Batch