SoFunction
Updated on 2024-11-17

Python large data volume of text files efficient parsing scheme code implementation of the whole process

test environment

Python 3.6.2

Win 10 RAM 8G, CPU I5 1.6 GHz

Background description

This work comes from the development of a log parsing tool, a pain point encountered during this development process, is that there are many log files, large amount of log data, parsing time-consuming. In this case, looking for an efficient parsing data parsing program.

Solution Description

1, the use of multi-threaded read file

2, the use of reading files by block instead of reading files by line

Since the log files are text files, you need to read each line for parsing, so at first it would be natural to think of reading by line, and later found that a reasonable configuration, read by block, will be more efficient than reading by line.

The problem with reading by chunks is that it may result in complete rows of data scattered in different chunks of data, so how to solve this problem? The solution is as follows:

wrap\nSlicing to get a list of log lines, the first element of the list may be a complete log line or a component of the log line at the end of the previous data block, the last element of the list may be an incomplete log line (i.e., a component of the log line at the beginning of the next data block) or an empty string (all the data of the log line in the log block are complete), according to this law, the following formula is derived. Through this formula, a new data block can be obtained, and the second slice of this data block can be obtained with complete data log lines

Previous log block first log line + \n + Last log line + Next data block first log line + \n + Last log line + ...

3. Split the data parsing operation into parallelizable and non-parallelizable parts.

Data parsing often involves a number of non-parallel operations, such as data summation, maximum statistics, etc., if not split, parallel parsing is bound to need to add mutual exclusion locks to avoid data coverage, which will greatly reduce the efficiency of the implementation, especially in the case of non-parallel operations accounted for a large number of cases.

After splitting the data parsing operation, the parallelizable part of the parsing operation does not need to be locked. Considering the Python GIL, the non-parallelizable part of the parsing operation is replaced with a single-process parsing operation.

4. Adopt multi-process parsing instead of multi-threaded parsing

Using multi-process parsing instead of multi-threaded parsing can improve parsing efficiency by bypassing the execution efficiency problems caused by the Python GIL global interpretation lock.

5, the use of queues to achieve "synergistic" effect

Introduce a queuing mechanism to realize reading logs while parsing data:

  • The log reading thread stores the log blocks in a queue, and the parsing process fetches the read log blocks from the queue and performs a parallel parsing operation.
  • The parallel parsing operation process stores the parsed results in another queue, and another parsing process fetches the data from the queue and performs a non-parallelizable parsing operation.

code implementation

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from datetime import datetime
from joblib import Parallel, delayed, parallel_backend
from collections import deque
from multiprocessing import cpu_count
import threading


class LogParser(object):
    def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
        self.log_unparsed_queue = deque() # Used to store unresolved logs
        self.log_line_parsed_queue = deque()  # Used to store parsed log lines
        self.is_all_files_read = False  # Identifies whether all log files have been read
        self.process_num_for_log_parsing = process_num_for_log_parsing # of processes concurrently parsing log files
        self.chunk_size = chunk_size # Log block size per log read
        self.files_read_list = [] # Store read log files
        self.log_parsing_finished = False # Identifies whether log parsing is complete


    def read_in_chunks(self, filePath, chunk_size=1024*1024):
        """
        Inert function (generator) for reading files block by block.
        Default block size: 1M
        """

        with open(filePath, 'r', encoding='utf-8') as f:            
            while True:
                chunk_data = (chunk_size)
                if not chunk_data:
                    break
                yield chunk_data


    def read_log_file(self, logfile_path):
        '''
        Reading log files
        It is assumed here that the log files are all text files, and after reading them in chunks, they can be sliced and diced twice by line breaks in order to get the line logs
        '''

        temp_list = []  # After the second cut, the header and footer logs may be incomplete, so you need to connect the header and footer logs of the log block and splice them together.
        for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
            log_chunk = ('\n')
            temp_list.extend([log_chunk[0], '\n'])
            temp_list.append(log_chunk[-1])
            self.log_unparsed_queue.append(log_chunk[1:-1])
        self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
        self.files_read_list.remove(logfile_path)


    def start_processes_for_log_parsing(self):
        '''Start log parsing process'''

        with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
            Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))

        self.log_parsing_finished = True

    def parse_logs(self):
        '''Parsing Logs'''

        method_url_re_pattern = ('(HEAD|POST|GET)\s+([^\s]+?)\s+',)
        url_time_taken_extractor = ('HTTP/1\.1.+\|(.+)\|\d+\|', )

        while self.log_unparsed_queue or self.files_read_list:
            if not self.log_unparsed_queue:
                continue
            log_line_list = self.log_unparsed_queue.popleft()
            for log_line in log_line_list:
                #### do something with log_line
                if not log_line.strip():
                    continue

                res = method_url_re_pattern.findall(log_line)
                if not res:
                    print('The log did not match the request URL and has been ignored:\n%s' % log_line)
                    continue
                method = res[0][0]
                url = res[0][1].split('?')[0]  # Removed ? and the following url argument

                # Extraction time consuming
                res = url_time_taken_extractor.findall(log_line)
                if res:
                    time_taken = float(res[0])
                else:
                    print('Request elapsed time not extracted from logs, logs have been ignored:\n%s' % log_line)
                    continue

                # Store parsed log messages
                self.log_line_parsed_queue.append({'method': method,
                                                   'url': url,
                                                   'time_taken': time_taken,
                                                   })


    def collect_statistics(self):
        '''Collecting statistical data'''

        def _collect_statistics():
            while self.log_line_parsed_queue or not self.log_parsing_finished:
                if not self.log_line_parsed_queue:
                    continue
                log_info = self.log_line_parsed_queue.popleft()
                # do something with log_info
       
        with parallel_backend("multiprocessing", n_jobs=1):
            Parallel()(delayed(_collect_statistics)() for i in range(1))

    def run(self, file_path_list):
        # Multi-threaded reading of log files
        for file_path in file_path_list:
            thread = (target=self.read_log_file,
                                      name="read_log_file",
                                      args=(file_path,))
            ()
            self.files_read_list.append(file_path)

        # Start the log parsing process
        thread = (target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
        ()

        # Initiate the log statistics collection process
        thread = (target=self.collect_statistics, name="collect_statistics")
        ()

        start = ()
        while threading.active_count() > 1:
            print('The program is trying to parse the logs...')
            (0.5)

        end = ()
        print('Parsing complete', 'start', start, 'end', end, 'Time-consuming', end - start)

if __name__ == "__main__":
    log_parser = LogParser()
    log_parser.run(['', ''])

Attention:

Need to reasonably configure the size of the single read file data block, can not be too large, or too small, otherwise it may lead to slow data reading. I practice environment, found that 10M ~ 15M each time is a more efficient configuration.

summarize

This article on the Python large data volume text file efficient parsing program code to achieve this article is introduced to this, more related Python large data volume text file parsing content, please search for my previous articles or continue to browse the following related articles I hope you will support me in the future!