SoFunction
Updated on 2024-11-21

Python Timed Data Extraction from Mysql into Redis Implementation

Design Ideas:

1. Once the program is running, python will extract all the data from mysql for the last period of time

2. Then instantiate the redis class, and pass the data into the redis queue one by one after simple parsing.

3. Timer design starts running at 12:00 a.m. every day

ps: redis is an in-memory database, do the background message queue cache has a great use, interested partners can go to view the relevant documents.

 # -*- coding:utf-8 -*- 

import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis

# get the data from mysql
class FromSql(object):
  def __init__(self, conn):
     = conn

  def acquire(self):
    cursor = ()
    try:
      sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"

      (sql)
      rs = ()
      #print (rs)
      for eve in rs:

        print('%s, %s, %s, %s' % eve)
      copy_rs = rs
      ()

      return copy_rs 

    except Exception as e:
      print("The error: %s" % e)


class RedisQueue(object):

  def __init__(self, name, namespace='queue', **redis_kwargs):
    """The default connection parameters are: host='localhost', port=6379, db=0"""
    self.__db= (**redis_kwargs)
     = '%s:%s' %(namespace, name)

  def qsize(self):
    return self.__db.llen()

  def put(self, item):
    self.__db.rpush(, item)

  def get(self, block=True, timeout=None):

    if block:
      item = self.__db.blpop(, timeout=timeout)
    else:
      item = self.__db.lpop()

    if item:
      item = item[1]
    return item

  def get_nowait(self):
    return (False)


if __name__ == "__main__":
  # connect mysqldb
  conn_sql = (
            host = '127.0.0.1',
            port = 3306,
            user = 'root',
            passwd = '',
            db = 'test',
            charset = 'utf8'
            )


def job_for_redis():
    get_data = FromSql(conn_sql)
    data = get_data.acquire()

    q = RedisQueue('test',host='localhost', port=6379, db=0)
    for single_data in data:
      for meta_data in single_data:
        (meta_data)
        print(meta_data)
    print("All data had been inserted.") 

"""
  try:
    ().("00:00").do(job_for_redis)
  except Exception as e:
    print('Error: %s'% e)
#  finally:
#    ()

  while True:
    schedule.run_pending()
    (1)
"""

Additional knowledge:python regularly get the exchange rate into the database

python timed tasks:

We can use the lightweight third-party module schedule. first install: pip install schedule

A small test for timed tasks:

import schedule
import time
 
def job():
  print("I'm working...")
 
(10).(job)       # Tasks performed at 10-minute intervals
().(job)          # Missions carried out at hourly intervals
().("10:30").do(job)    # One mission per day at 10:30
(5).to(10).(job)      # :: Missions every 5-10 days
().(job)         # One mission every Monday at this time
().("13:15").do(job) # One mission per week on Wednesdays at 13:15
 
while True:
  schedule.run_pending()

Get the data into the database: (the format may not be quite right, there are some symbols. Just modify it yourself)

import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine

# Get all the foreign exchange in US dollars
def job():
  content = 'dollars'
  url = '/sourcedb/whpj/' #Foreign exchange data address
  html = (url).('utf-8')

  index = ('<td>' + content + '</td>')
  str = html[index:index+300]
  result = ('<td>(.*?)</td>',str)

  print("Currency:" + result[0])
  print("Bid price in spot currency:" + result[1])
  print("Cash purchase price:" + result[2])
  print("Selling price in spot currency:" + result[3])
  print("Cash Selling Price:" + result[4])
  print("BOC settlement price:" + result[5])
  print("Published:" + result[6] + ' ' + result[7])
  
 # Local address Database account number Password Database name
  db = ('localhost','root','root','pinyougoudb')
  cursor = ()
  
 #sql statement
  sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])

  (sql)
  ()
  print('success')

 # Query statement to get the data out of the deposit
  # sqlalchemy for database initialization
  engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
  sql = '''select * from tb_money'''

  # pandas for database reads and writes
  df = pandas.read_sql_query(sql,engine)
  print(df)

  ()


# Refresh every few minutes
#(0.1).(job)

#What time of day does it refresh?
().("09:29").do(job)
().("09:30").do(job)

# Keep looping until the condition is met #
while True:
  schedule.run_pending()

The above implementation of this Python timed extraction of data from Mysql into Redis is all that I have shared with you, I hope to give you a reference, and I hope you will support me more.