SoFunction
Updated on 2024-11-19

Python multi-threaded implementation of payment simulation request process analysis

Thoughts:

Queue usage instructions:

  •  ()# Used for inter-process communication, a single master process cannot communicate with its children (try not to use this when using process pools)
  •  ().Queue()# for master-child process communication, through the process pool (pool) to create the process can be data-sharing
  •  ()# Used for inter-thread communication, data within the same process can be shared

1. Get the orders to be paid from the database

2. Will get out of the data added to the queue (()), and in the function to return the length of the message queue

3. According to the length of the queue to create the corresponding number of threads

4. Put the created thread in list

5. Start in order

6. Finally, wait for the main thread to finish executing the end of the statistics function running time

The code is as follows

import asyncio
import sys
from queue import Queue
("../")
from tool.__init__ import *
from tool.decorator_token import *
import time
from threading import Thread,Lock

class doWeChatNotify(BaseTest):
  def __init__(self):
    super().__init__()
    self.limit_num=100 # of records queried
    self.WeChatNotify_sql='''select order_id,order_sn from fw_order where `status`=0 
            and course_id=1569 ORDER BY create_time desc limit %d ;'''%(self.limit_num)
    self.fwh_test_api=fwh_test_api
     = self.my_op.sql_operation_fwh(self.WeChatNotify_sql)
    self.fwh_order_dict = {}
     = Queue()


  @token_fwh# Verify token validity
  def get_fwh_token_list(self):
    token_list=self.fwh_token.loadTokenList()
    return token_list

  @token_crm#validate token has def get_crm_token_list(self) token_list=()
    return token_list

  def testDoWeChatNotify(self):
    DoWeChatNotify_file='../tokenFileAndtxtFiles'+'/'+"DoWeChatNotify_asynchronousPay.txt"
    with open(DoWeChatNotify_file,'a',encoding='utf=-8') as file:
      str_first="order_id\t"+"order_sn\t\n" # First line of file data
      (str_first)
    fwh_order_id_list, fwh_order_sn_list = [], []

    if !=():
      for a in :
        fwh_order_id=a['order_id']
        fwh_order_sn=a['order_sn']
        self.fwh_order_dict[fwh_order_id]=fwh_order_sn

        with open(DoWeChatNotify_file,'a',encoding='utf-8') as file2:# File Write
          str_DoWeChatNotifyInfo=str(fwh_order_id)+'\t'+str(fwh_order_sn)+'\t\n'
          () # Clear the buffer
          (str_DoWeChatNotifyInfo)
        (self.fwh_order_dict)# Add data to the queue
    # Close the database connection
    # self.my_op.close_db_fwh()
    # self.my_op.close_db()
    return ()# of return queues

  def asynchronousPay(self,order_id,order_sn):
    count=1
    count_num=50
    token_list=self.get_fwh_token_list()
    if (!=()):
      headers_form_urlencoded['token']=token_list[0]
      url_wechat_success_huidiao=self.fwh_test_api+'/index/Order/doWeChatNotify'
      data_wechat_success_huidiao=self.data_to_str.requestDataToStr_firefoxAndChrome_fwh('''order_sn:{}
order_id:{}
meth_id:4
timestamp:157129653969
sign:0687b01b300b9e300d3996a9d2173f1380973e5a'''.format(order_sn,order_id))
      request_wechat_success_huidiao=(url=url_wechat_success_huidiao,headers=headers_form_urlencoded,data=data_wechat_success_huidiao)
      response_wechat_success_huidiao=request_wechat_success_huidiao.json()
      if 'Order status error, not a pending order' in response_wechat_success_huidiao['msg']:
        print(data_wechat_success_huidiao)
    else:
      print('Pending payment order is empty')

  def run_multithreading(self):#Multithreading
    threads = []#Store all the threads
    nloops = list(range(()))# Get the number of queues
    if len(nloops)>0:
      for i,k in zip(nloops,().items()):#Create threads based on the number of queues
        t = Thread(target=,args=(k[0],k[1]))
        (t)

      for s in nloops: # Starting multiple threads
        threads[s].start()

      for j in nloops: # Wait for all threads to finish
        threads[j].join()
    else:
      print("The number of queues is empty.")

if __name__=="__main__":
  start_time = () # Calculate program start time
  wechfy=doWeChatNotify()
  wechfy.run_multithreading()#Multithreading
  print('program time consumption{:.2f}'.format(() - start_time)) # Calculate the total time spent on the program

Summary: pro-test running time will still be much faster, single-threaded payment of 100 orders four dozen seconds, multi-threaded run without seconds, with join eight seconds, there is still a lot of room for optimization, because the runtime will create 100 threads

This is the whole content of this article.