SoFunction
Updated on 2025-05-16

Detailed explanation of Java's method of real-time synchronization of MySQL data to Elasticsearch

Introduction: Why do real-time synchronization need?

MySQL is good at transaction processing, while Elasticsearch (ES) focuses on search and analysis. Synchronizing MySQL data to ES in real time can give full play to the advantages of both, such as:

  • Build high-performance search services
  • Real-time data analysis and large-screen display
  • Improve complex query efficiency

Traditional solutions (such as timed full synchronization) have problems such as high delays and waste of resources. This article will implement millisecond real-time synchronization based on MySQL Binlog monitoring, and provide complete Java code and deep source code analysis.

1. Technology selection and core principles

1.1 Core Components

MySQL Binlog: MySQL's binary log, recording all data change events (addition, deletion and modification).

Canal/OpenReplicator: a tool for parsing Binlog (this article uses lightweight mysql-binlog-connector-java).

Elasticsearch High Level REST Client: ES official Java client, used for data writing.

1.2 Architecture flow chart

MySQL Server → Binlog → Java Listener → Data Conversion → Elasticsearch

2. Environment preparation and configuration

2.1 MySQL enables Binlog

# Modify (Linux) or (Windows)[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW  # Must be in ROW mode

2.2 Create an ES index

PUT /user
{
  "mappings": {
    "properties": {
      "id": {"type": "integer"},
      "name": {"type": "text"},
      "email": {"type": "keyword"},
      "create_time": {"type": "date"}
    }
  }
}

3. Java code implementation

3.1 Maven dependencies

<dependency>
    <groupId></groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.25.4</version>
</dependency>
<dependency>
    <groupId></groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.3</version>
</dependency>

3.2 Core code (Binlog listening and synchronization)

import ;
import .*;
import ;
import ;
import ;
import ;
 
public class MySQL2ESSyncer {
 
    private static final String ES_INDEX = "user";
 
    public static void main(String[] args) throws Exception {
        // Initialize the ES client        RestHighLevelClient esClient = ();
 
        // Configure Binlog monitoring        BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");
        (1001); // Unique ID to avoid conflicts 
        (event -&gt; {
            EventData data = ();
            if (data instanceof WriteRowsEventData) {
                // Handle insert event                handleWriteEvent((WriteRowsEventData) data, esClient);
            } else if (data instanceof UpdateRowsEventData) {
                // Handle update events                handleUpdateEvent((UpdateRowsEventData) data, esClient);
            } else if (data instanceof DeleteRowsEventData) {
                // Handle deletion events                handleDeleteEvent((DeleteRowsEventData) data, esClient);
            }
        });
 
        (); // Start monitoring    }
 
    private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {
        ().forEach(row -&gt; {
            // Assume that the table structure is: id, name, email, create_time            String json = (
                "{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",
                row[0], row[1], row[2], row[3]
            );
            IndexRequest request = new IndexRequest(ES_INDEX)
                .id(row[0].toString())
                .source(json, );
            (request, );
        });
    }
 
    // Update and delete are similar, the code is slightly (see the link at the end of the article for the complete source code)}

4. Source code in-depth analysis

4.1 Binlog monitoring process

BinaryLogClient: Core class, responsible for connecting to MySQL and listening to Binlog.

Event type judgment: Use WriteRowsEventData, UpdateRowsEventData, and DeleteRowsEventData to distinguish the addition, modification, and deletion operations.

4.2 Key points of data conversion

Row data analysis: Extract the specific values ​​of changed rows from events, which must correspond to the table structure order.

ES document ID: It is recommended to use MySQL primary key to ensure that the update/deletion operation can accurately locate the document.

4.3 Exception handling and optimization

Retry mechanism: When ES write fails, you can join the retry queue.

Batch submission: batch writes to ES improve performance (real time needs to be weighed).

Transaction consistency: Ensure Binlog location persistence and avoid data loss.

5. Comparison of advantages and disadvantages of the plan

plan Real-time Complexity Resource consumption
Timed full synchronization Low (minute level) Low high
Based on trigger high High (requires table modification) middle
Binlog Monitoring high middle Low

6. Summary and Extension

This article implements real-time synchronization of MySQL to ES based on Binlog, with the following advantages:

  • Real-time: millisecond-level delay, meeting most business scenarios.
  • No intrusion: No need to modify the MySQL table structure.
  • Scalable: Easily adaptable to other data sources (such as PostgreSQL).

Expansion direction:

  • Using Kafka as the intermediate layer, decoupling production and consumption.
  • Increase monitoring alarms to ensure data consistency.
  • Supports automatic synchronization of DDL changes (such as table structure modification).

This is the introduction to this article about the detailed explanation of Java's method of real-time synchronization of MySQL data to Elasticsearch. For more related content on Java MySQL data to Elasticsearch, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!