SpringBoot入门实战:SpringBoot整合Spring Batch

冰山美人 2024-05-16 ⋅ 35 阅读

介绍

在实际开发中,数据的批量处理是非常常见的需求,例如数据的导入、导出、转换等。Spring Batch 是一个强大的批处理框架,可以轻松处理这些任务。本文将介绍如何使用 Spring Boot 整合 Spring Batch,实现数据批量处理的功能。

准备工作

在开始之前,需要确保你已经安装了以下软件:

  • JDK(版本 1.8 或以上)
  • Maven
  • IDE(推荐使用 IntelliJ IDEA 或 Eclipse)

步骤一:创建 Spring Boot 项目

首先,我们需要创建一个 Spring Boot 项目。打开你的 IDE,选择创建一个新的 Spring Boot 项目。在创建项目的过程中,你需要选择 Spring Batch 作为一个依赖。

步骤二:配置 Spring Batch

在创建完项目后,我们需要配置 Spring Batch 的相关内容。首先,在配置文件(application.properties 或 application.yml)中添加以下配置:

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/batch_demo?useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

# Spring Batch 配置
spring.batch.initializer.enabled=true
spring.batch.job.enabled=true
spring.batch.job.names=CsvToDatabaseJob

此外,我们需要创建一个数据库表来存储 Spring Batch 的元数据。你可以使用以下 SQL 在数据库中创建表:

CREATE TABLE BATCH_JOB_INSTANCE  (
    JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT ,
    JOB_NAME VARCHAR(100) NOT NULL,
    JOB_KEY VARCHAR(32) NOT NULL,
    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION  (
    JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT  ,
    JOB_INSTANCE_ID BIGINT NOT NULL,
    CREATE_TIME DATETIME NOT NULL,
    START_TIME DATETIME DEFAULT NULL ,
    END_TIME DATETIME DEFAULT NULL ,
    STATUS VARCHAR(10),
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME,
    constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
    references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION (
    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
    VERSION BIGINT NOT NULL,
    STEP_NAME VARCHAR(100) NOT NULL,
    JOB_EXECUTION_ID BIGINT NOT NULL,
    START_TIME DATETIME NOT NULL ,
    END_TIME DATETIME DEFAULT NULL ,
    STATUS VARCHAR(10),
    COMMIT_COUNT INT ,
    READ_COUNT INT ,
    FILTER_COUNT INT ,
    WRITE_COUNT INT ,
    READ_SKIP_COUNT INT ,
    WRITE_SKIP_COUNT INT ,
    PROCESS_SKIP_COUNT INT ,
    ROLLBACK_COUNT INT,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME,
    FOREIGN KEY (JOB_EXECUTION_ID)
    REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
    JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
    references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    JOB_EXECUTION_ID BIGINT NOT NULL ,
    TYPE_CD VARCHAR(6) NOT NULL ,
    KEY_NAME VARCHAR(100) NOT NULL ,
    STRING_VAL VARCHAR(250) ,
    DATE_VAL DATETIME DEFAULT NULL ,
    LONG_VAL BIGINT ,
    DOUBLE_VAL DOUBLE PRECISION ,
    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
    UNIQUE_KEY CHAR(1) NOT NULL ,
    ID BIGINT NOT NULL ,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('X', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('A', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('B', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('C', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('D', 0);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values ('E', 0);

步骤三:编写 Spring Batch 作业

现在,我们可以开始编写我们的第一个 Spring Batch 作业了。首先,创建一个类,并添加 @Configuration@EnableBatchProcessing 注解。然后,按照以下步骤编写作业:

  • 创建一个 ItemReader,用于从输入源读取数据。你可以选择从数据库、文件等源读取数据。
  • 创建一个 ItemProcessor,用于对读取的数据进行处理或转换。你可以根据需求自定义处理逻辑。
  • 创建一个 ItemWriter,用于将处理后的数据写入目标数据库、文件等。
  • 创建一个 Job,将 ItemReader、ItemProcessor 和 ItemWriter 组合在一起,并定义作业的执行流程。

以下是一个简单的作业示例:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public ItemReader<Person> reader() {
        // 创建一个 ItemReader
        // 从数据库或文件中读取数据
    }
    
    @Bean
    public ItemProcessor<Person, Person> processor() {
        // 创建一个 ItemProcessor
        // 对读取的数据进行处理或转换
    }
    
    @Bean
    public ItemWriter<Person> writer() {
        // 创建一个 ItemWriter
        // 将处理后的数据写入目标数据库或文件
    }
    
    @Bean
    public Step step1(ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step)
                .end()
                .build();
    }
}

在上面的示例中,我们定义了一个由三个步骤组成的作业。Step 1 是整个作业的第一个步骤,它包含了自定义的 ItemReader、ItemProcessor 和 ItemWriter。

步骤四:运行 Spring Batch 作业

完成了作业的编写之后,我们可以运行它了。你可以创建一个 RESTful 接口或命令行等方式来启动作业。

如果你选择使用命令行来运行作业,你需要在主类中添加以下代码:

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication implements CommandLineRunner {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importUserJob;

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addDate("date", new Date())
                .toJobParameters();
        jobLauncher.run(importUserJob, jobParameters);
    }
}

以上代码中,我们创建了一个 CommandLineRunner 的实现类,并注入了 JobLauncher 和 Job 实例。在 run 方法中,我们创建了一个 JobParameters 对象,并以当前的日期作为参数运行作业。

总结

通过本文的实例,我们了解了如何使用 Spring Boot 整合 Spring Batch。Spring Batch 提供了强大的批处理功能,可以轻松处理数据的批量处理任务。希望本文能帮助你快速入门 Spring Boot 和 Spring Batch,并在实际开发中使用它们。


全部评论: 0

    我有话说: