概述
在现代数据处理应用中,实时数据同步至搜索引擎是一个重要的需求。本文将介绍如何利用Spring Boot和Flink CDC (Change Data Capture)来实现MySQL数据库数据的实时同步至Elasticsearch,采用DataStream方式进行处理。
准备工作
在开始之前,确保以下组件已经安装并配置好:
- JDK 1.8 或以上版本
- Maven 3.x 或以上版本
- MySQL 数据库
- Elasticsearch
项目依赖
在pom.xml文件中添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
数据源配置
在项目的 application.properties 文件中,配置MySQL和Elasticsearch的连接信息:
spring.datasource.url=jdbc:mysql://<mysql_host>:<port>/<database_name>
spring.datasource.username=<username>
spring.datasource.password=<password>
spring.elasticsearch.rest.uris=http://<es_host>:<port>
实现同步逻辑
创建一个 Flink 应用程序类,并添加以下代码:
public class FlinkCDCMySQLToElasticsearch {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 创建 MySQL CDC Source
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("root")
.password("password")
.databaseList("db1")
.tableList("db1.table1,db2.table2")
.startFromEarliest()
.build();
// 创建 Elasticsearch Sink
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.source(json, XContentType.JSON);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
// 将数据源添加到 Flink 程序
env
.addSource(mysqlSource)
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
// 自定义数据转换逻辑
return value;
}
})
.addSink(esSinkBuilder.build());
env.execute("Flink CDC MySQL to Elasticsearch");
}
}
启动应用程序
使用命令行工具进入项目根目录,并执行以下命令:
mvn clean package
然后,使用以下命令运行应用程序:
java -jar target/my-project.jar
总结
通过Spring Boot和Flink CDC,我们可以很容易地实现MySQL数据的实时同步至Elasticsearch。使用DataStream方式处理数据,可以实现更高的性能和并行处理能力。希望本文对您有所帮助,欢迎留言交流!
参考链接:
本文来自极简博客,作者:晨曦微光,转载请注明原文链接:Spring Boot Flink CDC MySQL 同步 Elasticsearch (DataStream方式)