SoFunction
Updated on 2024-11-19

python ETL tools pyetl

pyetlis a pure python development of ETL framework, compared to sqoop, datax and other ETL tools, pyetl can be added to each field udf function, so that the data conversion process is more flexible, compared to the professional ETL tools pyetl more lightweight, pure python code operation, more in line with the developer's habits

mounting

pip3 install pyetl

usage example

Data synchronization between database tables

from pyetl import Task, DatabaseReader, DatabaseWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")
Task(reader, writer).start()

Database table to hive table synchronization

from pyetl import Task, DatabaseReader, HiveWriter2
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = HiveWriter2("hive://localhost:10000/default", table_name="target")
Task(reader, writer).start()

Database table synchronization es

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget")
Task(reader, writer).start()

Original table target table field names are different, need to add field mapping

increase

# Original table source contains uuid, full_name fields
reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# The target table target contains the id, name fields
writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns configure the field mapping relationship between the target table and the original table
columns = {"id": "uuid", "name": "full_name"}
Task(reader, writer, columns=columns).start()

udf mapping of fields, rule checking of fields, data normalization, data cleansing, etc.

# functionsConfigure the udf mapping of the fields as follows: id to string, name with spaces removed
functions={"id": str, "name": lambda x: ()}
Task(reader, writer, columns=columns, functions=functions).start()

Inherit Task class to flexibly extend ETL tasks

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

class NewTask(Task):
  reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
  writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
  
  def get_columns(self):
    """Generate field mapping configurations by means of functions for more flexible use"""
    # The following example takes the field mapping configuration out of the database and returns it as a dictionary type
    sql = "select columns from task where name='new_task'"
    columns = .read_one(sql)["columns"]
    return (columns)
   
  def get_functions(self):
    """Generating udf mappings for fields by means of functions"""
    # The following example converts each field type to a string
    return {col: str for col in }
   
  def apply_function(self, record):
    """udf of an entire piece of data in a data stream"""
    record["flag"] = int(record["id"]) % 2
    return record

  def before(self):
    """Operations to be performed before the task starts, such as initializing the task table, creating the target table, etc."""
    sql = "create table destination_table(id int, name varchar(100))"
    (sql)
  
  def after(self):
    """Actions to be performed after the task is completed, such as updating the task status, etc."""
    sql = "update task set status='done' where name='new_task'"
    (sql)

NewTask().start()

List of currently implemented Readers and Writers

 

Reader present (sb for a job etc)
DatabaseReader Supports reading of all relational databases
FileReader Structured text data reading, e.g. csv files
ExcelReader Excel sheet file reading

Writer present (sb for a job etc)
DatabaseWriter Supports writes to all relational databases
ElasticSearchWriter Batch write data to es index
HiveWriter Batch insert hive table
HiveWriter2 Load data way to import hive table (recommended)
FileWriter Write data to a text file

Project Addresspyetl

summarize

to this article on python ETL tools pyetl to this article, more related python ETL tools pyetl content please search for my previous articles or continue to browse the following related articles I hope you will support me more in the future!