Before the blog with logstash-input-jdbc synchronization of mysql data to ElasticSearch, but because the synchronization time is at least once a minute, can not meet the online business, so you can only achieve one, but the time is tight, so a simple implementation of a
Thoughts:
There are a lot of ideas online with what mysql binlog function or something, but my understanding of mysql is really limited, so a very dull way to query mysql to get the data, and then inserted into the es, because the amount of data is not large, and 10-second intervals to synchronize once, the efficiency can be, in order to avoid the time difference between the servers and the time difference between the mysql update and query, so that no matter how much data is and how much time it takes to update will not be less data, because the principle is to synchronize without missing any data. So in the query update time conditions are compared with the last synchronization start time, so that no matter how much data, update time consuming how much will not be less data, because the principle is to synchronize not to miss any data, but also the program can be more than one will be the difference between the time difference and the interval differentiation, because the mysql with an id as an es id, but also to avoid duplicate data!
Use:
Just follow the write configuration file, then write sql file, and finally direct execution can be, I also refer to this logstash-input-jdbc configuration form
MsToEs
|---- (configuration file)
|---- (synchronization procedure)
|----sql_manage.py (database management)
|---- (requires sql file)
|---- (requires sql file)
sql_manage.py:
# -*-coding:utf-8 -*- __author__ = "ZJL" from import QueuePool from sqlalchemy import create_engine from import sessionmaker, scoped_session import traceback import esconfig # For operations that do not require rollback and commit def find(func): def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception as e: print(traceback.format_exc()) print(str(e)) return traceback.format_exc() finally: () return wrapper class MysqlManager(object): def __init__(self): mysql_connection_string = ("mysql_connection_string") = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool, pool_recycle=3600) # self.DB_Session = sessionmaker(bind=) # = self.DB_Session() self.DB_Session = sessionmaker(bind=, autocommit=False, autoflush=True, expire_on_commit=False) = scoped_session(self.DB_Session) = () @find def select_all_dict(self, sql, keys): a = (sql) a = () lists = [] for i in a: if len(keys) == len(i): data_dict = {} for k, v in zip(keys, i): data_dict[k] = v (data_dict) else: return False return lists # Close def close(self): ()
:
select CONVERT(c.`id`,CHAR) as id, c.`code` as code, c.`project_name` as project_name, c.`name` as name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
:
select CONVERT(c.`id`,CHAR) as id, CONVERT(c.`age`,CHAR) as age, c.`code` as code, c.`name` as name, c.`project_name` as project_name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `bb` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
:
# -*- coding: utf-8 -*- #__author__="ZJL" # sql file name matches type name in es mysql = { # mysql connection info "mysql_connection_string": "root:[email protected]:3306/xxx", # sql file info "statement_filespath":[ # sql corresponding es index and es type { "index":"a1", "sqlfile":"", "type":"aa" }, { "index":"a1", "sqlfile":"", "type":"bb" }, ], } # es ip and port elasticsearch = { "hosts":"127.0.0.1:9200", } # The order of the fields is the same as the order of the fields in the sql file, this is the name of the field that is stored in the es, here it is identified by the type name of the es db_field = { "aa": ("id", "code", "name", "project_name", "update_time", ), "bb": ("id", "code", "age", "project_name", "name", "update_time", ), } es_config = { # How many seconds between synchronizations "sleep_time":10, # To address time differences between servers "time_difference":3, # show_json is used to show the imported data in json format. "show_json":False, }
:
# -*- coding: utf-8 -*- #__author__="ZJL" from sql_manage import MysqlManager from esconfig import mysql,elasticsearch,db_field,es_config from elasticsearch import Elasticsearch from elasticsearch import helpers import traceback import time class TongBu(object): def __init__(self): try: # Whether to display json data on the console self.show_json = es_config.get("show_json") # How many seconds between synchronizations self.sleep_time = es_config.get("sleep_time") # To address errors in data updates during synchronization self.time_difference = es_config.get("time_difference") # Current time, reserved for future use # self.datetime_now = "" # es ip and port es_host = ("hosts") # Connectiones = Elasticsearch(es_host) # Connecting to mysql = MysqlManager() except : print(traceback.format_exc()) def tongbu_es_mm(self): try: # Synchronization start time start_time = () print("start..............",("%Y-%m-%d %H:%M:%S", (start_time))) # This list is used to batch insert es actions = [] # Get a list of all sql files statement_filespath = ("statement_filespath",[]) if self.datetime_now: # Current time plus time difference (interval plus time used to perform synchronization, equal to the start time of the last synchronization) and string formatting # sql in the formatting of time between the year, month and day and minutes, minutes and seconds can not be space, otherwise the import es times parsing error, so here the time formatting is also unified in the middle of the addition of a T self.datetime_now = ("%Y-%m-%dT%H:%M:%S", (()-(self.sleep_time+self.time_difference))) else: self.datetime_now = "1999-01-01T00:00:00" if statement_filespath: for filepath in statement_filespath: # sql file sqlfile = ("sqlfile") # Index of es es_index = ("index") # type of es es_type = ("type") # Read the contents of the sql file with open(sqlfile,"r") as opf: sqldatas = () # ::datetime_now is a customized special string for incremental updates if "::datetime_now" in sqldatas: sqldatas = ("::datetime_now",self.datetime_now) else: sqldatas = sqldatas # es and sql field mapping dict_set = db_field.get(es_type) # Access mysql and get a list, the elements are all dictionaries, the keys are field names and the values are data db_data_list = .select_all_dict(sqldatas, dict_set) if db_data_list: # Data collapsed into es format for db_data in db_data_list: action = { "_index": es_index, "_type": es_type, "@timestamp": ("%Y-%m-%dT%H:%M:%S", (())), "_source": db_data } # Auto-generate if there is no id field es_id = db_data.get("id", "") if es_id: action["_id"] = es_id # Whether to display the json re-terminator if self.show_json: print(action) # Putting the assembled data into a list (action) # batch insert data into es if list is not empty if len(actions) > 0 : (, actions) except Exception as e: print(traceback.format_exc()) else: end_time = () print("end...................",("%Y-%m-%d %H:%M:%S", (start_time))) self.time_difference = end_time-start_time finally: # Shut down the database if you get an error () def main(): tb = TongBu() # How many seconds between synchronizations sleep_time = tb.sleep_time # Execute imported data in a dead loop, plus time intervals while True: tb.tongbu_es_mm() (sleep_time) if __name__ == '__main__': main()
The above tutorial on this simple implementation of mysql data synchronization to ElasticSearch with python is all I have to share with you, I hope to give you a reference, and I hope you will support me more.