Spring Boot Flink CDC MySQL 同步 Elasticsearch (DataStream方式)

晨曦微光 2024-03-13 ⋅ 57 阅读

概述

在现代数据处理应用中,实时数据同步至搜索引擎是一个重要的需求。本文将介绍如何利用Spring Boot和Flink CDC (Change Data Capture)来实现MySQL数据库数据的实时同步至Elasticsearch,采用DataStream方式进行处理。

准备工作

在开始之前,确保以下组件已经安装并配置好:

  1. JDK 1.8 或以上版本
  2. Maven 3.x 或以上版本
  3. MySQL 数据库
  4. Elasticsearch

项目依赖

在pom.xml文件中添加如下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.21</version>
</dependency>

数据源配置

在项目的 application.properties 文件中,配置MySQL和Elasticsearch的连接信息:

spring.datasource.url=jdbc:mysql://<mysql_host>:<port>/<database_name>
spring.datasource.username=<username>
spring.datasource.password=<password>

spring.elasticsearch.rest.uris=http://<es_host>:<port>

实现同步逻辑

创建一个 Flink 应用程序类,并添加以下代码:

public class FlinkCDCMySQLToElasticsearch {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 创建 MySQL CDC Source
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("password")
                .databaseList("db1")
                .tableList("db1.table1,db2.table2")
                .startFromEarliest()
                .build();

        // 创建 Elasticsearch Sink
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("localhost", 9200, "http"));
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        return Requests.indexRequest()
                                .index("my-index")
                                .source(json, XContentType.JSON);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        esSinkBuilder.setBulkFlushMaxActions(1);

        // 将数据源添加到 Flink 程序
        env
                .addSource(mysqlSource)
                .map(new MapFunction<String, String>() {
                    @Override
                    public String map(String value) throws Exception {
                        // 自定义数据转换逻辑
                        return value;
                    }
                })
                .addSink(esSinkBuilder.build());

        env.execute("Flink CDC MySQL to Elasticsearch");
    }
}

启动应用程序

使用命令行工具进入项目根目录,并执行以下命令:

mvn clean package

然后,使用以下命令运行应用程序:

java -jar target/my-project.jar

总结

通过Spring Boot和Flink CDC,我们可以很容易地实现MySQL数据的实时同步至Elasticsearch。使用DataStream方式处理数据,可以实现更高的性能和并行处理能力。希望本文对您有所帮助,欢迎留言交流!

参考链接:


全部评论: 0

    我有话说: