Introduction: Why do real-time synchronization need?
MySQL is good at transaction processing, while Elasticsearch (ES) focuses on search and analysis. Synchronizing MySQL data to ES in real time can give full play to the advantages of both, such as:
- Build high-performance search services
- Real-time data analysis and large-screen display
- Improve complex query efficiency
Traditional solutions (such as timed full synchronization) have problems such as high delays and waste of resources. This article will implement millisecond real-time synchronization based on MySQL Binlog monitoring, and provide complete Java code and deep source code analysis.
1. Technology selection and core principles
1.1 Core Components
MySQL Binlog: MySQL's binary log, recording all data change events (addition, deletion and modification).
Canal/OpenReplicator: a tool for parsing Binlog (this article uses lightweight mysql-binlog-connector-java).
Elasticsearch High Level REST Client: ES official Java client, used for data writing.
1.2 Architecture flow chart
MySQL Server → Binlog → Java Listener → Data Conversion → Elasticsearch
2. Environment preparation and configuration
2.1 MySQL enables Binlog
# Modify (Linux) or (Windows)[mysqld] server_id=1 log_bin=mysql-bin binlog_format=ROW # Must be in ROW mode
2.2 Create an ES index
PUT /user { "mappings": { "properties": { "id": {"type": "integer"}, "name": {"type": "text"}, "email": {"type": "keyword"}, "create_time": {"type": "date"} } } }
3. Java code implementation
3.1 Maven dependencies
<dependency> <groupId></groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.25.4</version> </dependency> <dependency> <groupId></groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.17.3</version> </dependency>
3.2 Core code (Binlog listening and synchronization)
import ; import .*; import ; import ; import ; import ; public class MySQL2ESSyncer { private static final String ES_INDEX = "user"; public static void main(String[] args) throws Exception { // Initialize the ES client RestHighLevelClient esClient = (); // Configure Binlog monitoring BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password"); (1001); // Unique ID to avoid conflicts (event -> { EventData data = (); if (data instanceof WriteRowsEventData) { // Handle insert event handleWriteEvent((WriteRowsEventData) data, esClient); } else if (data instanceof UpdateRowsEventData) { // Handle update events handleUpdateEvent((UpdateRowsEventData) data, esClient); } else if (data instanceof DeleteRowsEventData) { // Handle deletion events handleDeleteEvent((DeleteRowsEventData) data, esClient); } }); (); // Start monitoring } private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) { ().forEach(row -> { // Assume that the table structure is: id, name, email, create_time String json = ( "{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}", row[0], row[1], row[2], row[3] ); IndexRequest request = new IndexRequest(ES_INDEX) .id(row[0].toString()) .source(json, ); (request, ); }); } // Update and delete are similar, the code is slightly (see the link at the end of the article for the complete source code)}
4. Source code in-depth analysis
4.1 Binlog monitoring process
BinaryLogClient: Core class, responsible for connecting to MySQL and listening to Binlog.
Event type judgment: Use WriteRowsEventData, UpdateRowsEventData, and DeleteRowsEventData to distinguish the addition, modification, and deletion operations.
4.2 Key points of data conversion
Row data analysis: Extract the specific values of changed rows from events, which must correspond to the table structure order.
ES document ID: It is recommended to use MySQL primary key to ensure that the update/deletion operation can accurately locate the document.
4.3 Exception handling and optimization
Retry mechanism: When ES write fails, you can join the retry queue.
Batch submission: batch writes to ES improve performance (real time needs to be weighed).
Transaction consistency: Ensure Binlog location persistence and avoid data loss.
5. Comparison of advantages and disadvantages of the plan
plan | Real-time | Complexity | Resource consumption |
---|---|---|---|
Timed full synchronization | Low (minute level) | Low | high |
Based on trigger | high | High (requires table modification) | middle |
Binlog Monitoring | high | middle | Low |
6. Summary and Extension
This article implements real-time synchronization of MySQL to ES based on Binlog, with the following advantages:
- Real-time: millisecond-level delay, meeting most business scenarios.
- No intrusion: No need to modify the MySQL table structure.
- Scalable: Easily adaptable to other data sources (such as PostgreSQL).
Expansion direction:
- Using Kafka as the intermediate layer, decoupling production and consumption.
- Increase monitoring alarms to ensure data consistency.
- Supports automatic synchronization of DDL changes (such as table structure modification).
This is the introduction to this article about the detailed explanation of Java's method of real-time synchronization of MySQL data to Elasticsearch. For more related content on Java MySQL data to Elasticsearch, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!