SoFunction
Updated on 2024-11-19

Python implementation of the database update script generation method

I am working in the test environment, the database used in the production environment with the database is not the same, when we test the database under the environment to complete the test ready to update to the production environment on the database, you need to prepare the update script, really not accidentally did not write down will forget where to change, where to add what this is really a very big headache. Therefore, I tried to use Python to realize the automatic generation of update scripts, so as to avoid my bad memory, can not remember things.

The main operations are as follows:

1. In the original add the following method, so that the old can easily get the database data for the test database and production database to do a comparison of the foundation.

def select_database_struts(self):
    '''
    Find the database structure in the current connection configuration as a collection of dictionaries
    '''
    sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT
        FROM information_schema.`COLUMNS` 
        WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database)
    struts = {}
    for k in self.__primaryKey_dict.keys():
      self.__cursor.execute((k))
      results = self.__cursor.fetchall()
      struts[k] = {}
      for result in results:
        struts[k][result[0]] = {}
        struts[k][result[0]]["COLUMN_NAME"] = result[0]
        struts[k][result[0]]["IS_NULLABLE"] = result[1]
        struts[k][result[0]]["COLUMN_TYPE"] = result[2]
        struts[k][result[0]]["COLUMN_KEY"] = result[3]
        struts[k][result[0]]["COLUMN_COMMENT"] = result[4]
    return self.__config, struts

2. Write Python scripts for comparisons

'''
Database Migration Script, currently supports the following functions:
1. Generate SQL scripts for database tables that are not in the old database (with or without table data), the generated SQL scripts are in the temp directory (table.sql).
2. Generate SQL scripts for adding columns. The generated SQL scripts are unified in the temp directory.
3. Generate SQL script for modifying column attributes. The generated SQL scripts will be placed in the temp directory.
4. Generate delete column SQL script, the generated SQL script will be placed in the temp directory.
The generated SQL scripts will be placed in the temp directory.''
import json, os, sys
from basedao import BaseDao

temp_path = [0] + "/temp"
if not (temp_path):
  (temp_path)

def main(old, new, has_data=False):
  '''
  @old old database (target database)
  @new newest database (source database)
  @has_data Whether to generate sql script for structure+data
  '''
  clear_temp()  # Clean up the temp directory first
  old_config, old_struts = old
  new_config, new_struts = new
  for new_table, new_fields in new_struts.items():
    if old_struts.get(new_table) is None:
      gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data)
    else:
      cmp_table(old_struts[new_table], new_struts[new_table], new_table)

def cmp_table(old, new, table):
  '''
  Compare table structure to generate sql
  '''
  old_fields = old
  new_fields = new

  sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
  sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
  sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};"

  if old_fields != new_fields:
    f = open([0] + "/temp/", "a", encoding="utf8")
    content = ""
    for new_field, new_field_dict in new_fields.items():
      old_filed_dict = old_fields.get(new_field)
      if old_filed_dict is None:
        # Generate add columns sql
        content += sql_add_column.format(TABLE=table, **new_field_dict)
      else:
        # Generate modified columns sql
        if old_filed_dict != new_field_dict:
          content += sql_change_column.format(TABLE=table, **new_field_dict)
        pass
    # Generate deleted columns sql
    for old_field, old_field_dict in old_fields.items():
      if new_fields.get(old_field) is None:
        content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field)
        
    (content)
    ()

def gc_sql(user, pwd, db, table, has_data):
  '''
  Generate sql file
  '''
  if has_data:
    sys_order = "mysqldump -u%s -p%s %s %s > %s/%"%(user, pwd, db, table, temp_path, table)
  else:
    sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%"%(user, pwd, db, table, temp_path, table)
  (sys_order)

def clear_temp():
  '''
  Call this each time it is executed, first clean up the old files under the temp directory
  '''
  if (temp_path):
    files = (temp_path)
    for file in files:
      f = (temp_path, file)
      if (f):
        (f)
  print("Temporary file directory cleanup complete.")

if __name__ == "__main__":
  test1_config = {
    "user" : "root", 
    "password" : "root",
    "database" : "test1", 
  }
  test2_config = {
    "user" : "root", 
    "password" : "root",
    "database" : "test2", 
  }
  
  test1_dao = BaseDao(**test1_config)
  test1_struts = test1_dao.select_database_struts()
  
  test2_dao = BaseDao(**test2_config)
  test2_struts = test2_dao.select_database_struts()

  main(test2_struts, test1_struts)

Currently only 4 types of SQL script generation are supported.

Above this Python implementation of the database update script generation method is all that I have shared with you, I hope to give you a reference, and I hope that you will support me more.