pyetlis a pure python development of ETL framework, compared to sqoop, datax and other ETL tools, pyetl can be added to each field udf function, so that the data conversion process is more flexible, compared to the professional ETL tools pyetl more lightweight, pure python code operation, more in line with the developer's habits
mounting
pip3 install pyetl
usage example
Data synchronization between database tables
from pyetl import Task, DatabaseReader, DatabaseWriter reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target") Task(reader, writer).start()
Database table to hive table synchronization
from pyetl import Task, DatabaseReader, HiveWriter2 reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = HiveWriter2("hive://localhost:10000/default", table_name="target") Task(reader, writer).start()
Database table synchronization es
from pyetl import Task, DatabaseReader, ElasticSearchWriter reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget") Task(reader, writer).start()
Original table target table field names are different, need to add field mapping
increase
# Original table source contains uuid, full_name fields reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") # The target table target contains the id, name fields writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") # columns configure the field mapping relationship between the target table and the original table columns = {"id": "uuid", "name": "full_name"} Task(reader, writer, columns=columns).start()
udf mapping of fields, rule checking of fields, data normalization, data cleansing, etc.
# functionsConfigure the udf mapping of the fields as follows: id to string, name with spaces removed functions={"id": str, "name": lambda x: ()} Task(reader, writer, columns=columns, functions=functions).start()
Inherit Task class to flexibly extend ETL tasks
import json from pyetl import Task, DatabaseReader, DatabaseWriter class NewTask(Task): reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") def get_columns(self): """Generate field mapping configurations by means of functions for more flexible use""" # The following example takes the field mapping configuration out of the database and returns it as a dictionary type sql = "select columns from task where name='new_task'" columns = .read_one(sql)["columns"] return (columns) def get_functions(self): """Generating udf mappings for fields by means of functions""" # The following example converts each field type to a string return {col: str for col in } def apply_function(self, record): """udf of an entire piece of data in a data stream""" record["flag"] = int(record["id"]) % 2 return record def before(self): """Operations to be performed before the task starts, such as initializing the task table, creating the target table, etc.""" sql = "create table destination_table(id int, name varchar(100))" (sql) def after(self): """Actions to be performed after the task is completed, such as updating the task status, etc.""" sql = "update task set status='done' where name='new_task'" (sql) NewTask().start()
List of currently implemented Readers and Writers
Reader | present (sb for a job etc) |
---|---|
DatabaseReader | Supports reading of all relational databases |
FileReader | Structured text data reading, e.g. csv files |
ExcelReader | Excel sheet file reading |
Writer | present (sb for a job etc) |
---|---|
DatabaseWriter | Supports writes to all relational databases |
ElasticSearchWriter | Batch write data to es index |
HiveWriter | Batch insert hive table |
HiveWriter2 | Load data way to import hive table (recommended) |
FileWriter | Write data to a text file |
Project Addresspyetl
summarize
to this article on python ETL tools pyetl to this article, more related python ETL tools pyetl content please search for my previous articles or continue to browse the following related articles I hope you will support me more in the future!