SoFunction
Updated on 2024-11-15

A simple tutorial to synchronize mysql data to ElasticSearch using python.

Before the blog with logstash-input-jdbc synchronization of mysql data to ElasticSearch, but because the synchronization time is at least once a minute, can not meet the online business, so you can only achieve one, but the time is tight, so a simple implementation of a

Thoughts:

There are a lot of ideas online with what mysql binlog function or something, but my understanding of mysql is really limited, so a very dull way to query mysql to get the data, and then inserted into the es, because the amount of data is not large, and 10-second intervals to synchronize once, the efficiency can be, in order to avoid the time difference between the servers and the time difference between the mysql update and query, so that no matter how much data is and how much time it takes to update will not be less data, because the principle is to synchronize without missing any data. So in the query update time conditions are compared with the last synchronization start time, so that no matter how much data, update time consuming how much will not be less data, because the principle is to synchronize not to miss any data, but also the program can be more than one will be the difference between the time difference and the interval differentiation, because the mysql with an id as an es id, but also to avoid duplicate data!

Use:

Just follow the write configuration file, then write sql file, and finally direct execution can be, I also refer to this logstash-input-jdbc configuration form

MsToEs

|---- (configuration file)

|---- (synchronization procedure)

|----sql_manage.py (database management)

|---- (requires sql file)

|---- (requires sql file)

sql_manage.py:

# -*-coding:utf-8 -*-
__author__ = "ZJL"
from  import QueuePool
from sqlalchemy import create_engine
from  import sessionmaker, scoped_session
import traceback
import esconfig
# For operations that do not require rollback and commit
def find(func):
 def wrapper(self, *args, **kwargs):
  try:
   return func(self, *args, **kwargs)
  except Exception as e:
   print(traceback.format_exc())
   print(str(e))
   return traceback.format_exc()
  finally:
   ()
 return wrapper
class MysqlManager(object):
 def __init__(self):
  mysql_connection_string = ("mysql_connection_string")
   = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
         pool_recycle=3600)
  # self.DB_Session = sessionmaker(bind=)
  #  = self.DB_Session()
  self.DB_Session = sessionmaker(bind=, autocommit=False, autoflush=True, expire_on_commit=False)
   = scoped_session(self.DB_Session)
   = ()
 @find
 def select_all_dict(self, sql, keys):
  a = (sql)
  a = ()
  lists = []
  for i in a:
   if len(keys) == len(i):
    data_dict = {}
    for k, v in zip(keys, i):
     data_dict[k] = v
    (data_dict)
   else:
    return False
  return lists
 # Close
 def close(self):
  ()

select 
 CONVERT(c.`id`,CHAR)    as id, 
 c.`code`   as code, 
 c.`project_name` as project_name, 
 c.`name`   as name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, 
from `cc` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

:

select 
 CONVERT(c.`id`,CHAR)    as id, 
 CONVERT(c.`age`,CHAR)    as age, 
 c.`code`   as code, 
 c.`name`   as name, 
 c.`project_name` as project_name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `bb` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

:

# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql file name matches type name in es
mysql = {
 # mysql connection info
 "mysql_connection_string": "root:[email protected]:3306/xxx",
 # sql file info
 "statement_filespath":[
  # sql corresponding es index and es type
  {
   "index":"a1",
   "sqlfile":"",
   "type":"aa"
  },
  {
   "index":"a1",
   "sqlfile":"",
   "type":"bb"
  },
 ],
}
# es ip and port
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
# The order of the fields is the same as the order of the fields in the sql file, this is the name of the field that is stored in the es, here it is identified by the type name of the es
db_field = {
  "aa":
   ("id",
   "code",
   "name",
   "project_name",
   "update_time",
   ),
 "bb":
  ("id",
   "code",
   "age",
   "project_name",
   "name",
   "update_time",
   ),
}
es_config = {
 # How many seconds between synchronizations
 "sleep_time":10,
 # To address time differences between servers
 "time_difference":3,
 # show_json is used to show the imported data in json format.
 "show_json":False,
}

:

# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
  try:
   # Whether to display json data on the console
   self.show_json = es_config.get("show_json")
   # How many seconds between synchronizations
   self.sleep_time = es_config.get("sleep_time")
   # To address errors in data updates during synchronization
   self.time_difference = es_config.get("time_difference")
   # Current time, reserved for future use #
   self.datetime_now = ""
   # es ip and port
   es_host = ("hosts")
   # Connectiones
    = Elasticsearch(es_host)
   # Connecting to mysql
    = MysqlManager()
  except :
   print(traceback.format_exc())
 def tongbu_es_mm(self):
  try:
   # Synchronization start time
   start_time = ()
   print("start..............",("%Y-%m-%d %H:%M:%S", (start_time)))
   # This list is used to batch insert es
   actions = []
   # Get a list of all sql files
   statement_filespath = ("statement_filespath",[])
   if self.datetime_now:
    # Current time plus time difference (interval plus time used to perform synchronization, equal to the start time of the last synchronization) and string formatting
    # sql in the formatting of time between the year, month and day and minutes, minutes and seconds can not be space, otherwise the import es times parsing error, so here the time formatting is also unified in the middle of the addition of a T
    self.datetime_now = ("%Y-%m-%dT%H:%M:%S", (()-(self.sleep_time+self.time_difference)))
   else:
    self.datetime_now = "1999-01-01T00:00:00"
   if statement_filespath:
    for filepath in statement_filespath:
     # sql file
     sqlfile = ("sqlfile")
     # Index of es
     es_index = ("index")
     # type of es
     es_type = ("type")
     # Read the contents of the sql file
     with open(sqlfile,"r") as opf:
      sqldatas = ()
      # ::datetime_now is a customized special string for incremental updates
      if "::datetime_now" in sqldatas:
       sqldatas = ("::datetime_now",self.datetime_now)
      else:
       sqldatas = sqldatas
      # es and sql field mapping
      dict_set = db_field.get(es_type)
      # Access mysql and get a list, the elements are all dictionaries, the keys are field names and the values are data
      db_data_list = .select_all_dict(sqldatas, dict_set)
      if db_data_list:
       # Data collapsed into es format
       for db_data in db_data_list:
        action = {
         "_index": es_index,
         "_type": es_type,
         "@timestamp": ("%Y-%m-%dT%H:%M:%S", (())),
         "_source": db_data
        }
        # Auto-generate if there is no id field
        es_id = db_data.get("id", "")
        if es_id:
         action["_id"] = es_id
        # Whether to display the json re-terminator
        if self.show_json:
         print(action)
        # Putting the assembled data into a list
        (action)
   # batch insert data into es if list is not empty
   if len(actions) > 0 :
    (, actions)
  except Exception as e:
   print(traceback.format_exc())
  else:
   end_time = ()
   print("end...................",("%Y-%m-%d %H:%M:%S", (start_time)))
   self.time_difference = end_time-start_time
  finally:
   # Shut down the database if you get an error
   ()
def main():
 tb = TongBu()
 # How many seconds between synchronizations
 sleep_time = tb.sleep_time
 # Execute imported data in a dead loop, plus time intervals
 while True:
  tb.tongbu_es_mm()
  (sleep_time)
if __name__ == '__main__':
 main()

The above tutorial on this simple implementation of mysql data synchronization to ElasticSearch with python is all I have to share with you, I hope to give you a reference, and I hope you will support me more.