Design Ideas:
1. Once the program is running, python will extract all the data from mysql for the last period of time
2. Then instantiate the redis class, and pass the data into the redis queue one by one after simple parsing.
3. Timer design starts running at 12:00 a.m. every day
ps: redis is an in-memory database, do the background message queue cache has a great use, interested partners can go to view the relevant documents.
# -*- coding:utf-8 -*- import MySQLdb import schedule import time import datetime import random import string import redis # get the data from mysql class FromSql(object): def __init__(self, conn): = conn def acquire(self): cursor = () try: sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1" (sql) rs = () #print (rs) for eve in rs: print('%s, %s, %s, %s' % eve) copy_rs = rs () return copy_rs except Exception as e: print("The error: %s" % e) class RedisQueue(object): def __init__(self, name, namespace='queue', **redis_kwargs): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db= (**redis_kwargs) = '%s:%s' %(namespace, name) def qsize(self): return self.__db.llen() def put(self, item): self.__db.rpush(, item) def get(self, block=True, timeout=None): if block: item = self.__db.blpop(, timeout=timeout) else: item = self.__db.lpop() if item: item = item[1] return item def get_nowait(self): return (False) if __name__ == "__main__": # connect mysqldb conn_sql = ( host = '127.0.0.1', port = 3306, user = 'root', passwd = '', db = 'test', charset = 'utf8' ) def job_for_redis(): get_data = FromSql(conn_sql) data = get_data.acquire() q = RedisQueue('test',host='localhost', port=6379, db=0) for single_data in data: for meta_data in single_data: (meta_data) print(meta_data) print("All data had been inserted.") """ try: ().("00:00").do(job_for_redis) except Exception as e: print('Error: %s'% e) # finally: # () while True: schedule.run_pending() (1) """
Additional knowledge:python regularly get the exchange rate into the database
python timed tasks:
We can use the lightweight third-party module schedule. first install: pip install schedule
A small test for timed tasks:
import schedule import time def job(): print("I'm working...") (10).(job) # Tasks performed at 10-minute intervals ().(job) # Missions carried out at hourly intervals ().("10:30").do(job) # One mission per day at 10:30 (5).to(10).(job) # :: Missions every 5-10 days ().(job) # One mission every Monday at this time ().("13:15").do(job) # One mission per week on Wednesdays at 13:15 while True: schedule.run_pending()
Get the data into the database: (the format may not be quite right, there are some symbols. Just modify it yourself)
import pymysql import schedule import time import requests import pandas from sqlalchemy import create_engine # Get all the foreign exchange in US dollars def job(): content = 'dollars' url = '/sourcedb/whpj/' #Foreign exchange data address html = (url).('utf-8') index = ('<td>' + content + '</td>') str = html[index:index+300] result = ('<td>(.*?)</td>',str) print("Currency:" + result[0]) print("Bid price in spot currency:" + result[1]) print("Cash purchase price:" + result[2]) print("Selling price in spot currency:" + result[3]) print("Cash Selling Price:" + result[4]) print("BOC settlement price:" + result[5]) print("Published:" + result[6] + ' ' + result[7]) # Local address Database account number Password Database name db = ('localhost','root','root','pinyougoudb') cursor = () #sql statement sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0]) (sql) () print('success') # Query statement to get the data out of the deposit # sqlalchemy for database initialization engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb') sql = '''select * from tb_money''' # pandas for database reads and writes df = pandas.read_sql_query(sql,engine) print(df) () # Refresh every few minutes #(0.1).(job) #What time of day does it refresh? ().("09:29").do(job) ().("09:30").do(job) # Keep looping until the condition is met # while True: schedule.run_pending()
The above implementation of this Python timed extraction of data from Mysql into Redis is all that I have shared with you, I hope to give you a reference, and I hope you will support me more.