Sqoop data acquisition format problem
Apache Sqoop is an open source tool, mainly used to transfer data between Hadoop (Hive) and traditional databases (mysql, postgresql...). It can import data from a relational database (for example: MySQL, Oracle, Postgres, etc.) into Hadoop's HDFS, or the HDFS data can be imported into a relational database.
The Sqoop project started in 2009 and first existed as a third-party module of Hadoop. Later, in order to enable users to deploy quickly and to enable developers to iterate and develop faster, Sqoop became an Apache project independently.
1. Working principle of Sqoop
- Data import: Sqoop implements parallel data import through MapReduce tasks. First, it will partition the data tables in the relational database according to certain rules, and then start a Map task for each partition, and read the data of the corresponding partition from the database and write the data to HDFS or other Hadoop storage systems. This can make full use of the distributed computing power of Hadoop cluster and improve the efficiency of data import.
- Export process: Similar to import, Sqoop will also partition the data, then read the data in Hadoop through the Map task, and write the data to the database according to the format requirements of the target relational database.
Sqoop creates an MR program for data transmission, thereby realizing data transmission.
Sqoop installation:
- JAVA environment configuration
- Hadoop environment configuration
- Related database driver packages
As long as the environment meets the above settings, you can directly decompress the Sqoop installation package and install it and use it after modifying the configuration.
2. Sqoop command format
Basic usage syntax:
sqoop import | export \ --Database connection parameters --HDFSorHiveConnection parameters --Configuration parameters
Common parameters for data transmission:
Options | parameter |
---|---|
–connect | jdbc:mysql://hostname:3306 (database connection URL) |
–username | Database username |
–password | Database user password |
–table | Specify the data table |
–columns | Specify the table column value |
–where | Data filtering conditions |
–e/–query | Custom SQL statements |
–driver | Specify the database driver |
–delete-target-dir | When importing data, clear the target directory |
–target-dir | Specify the directory for importing data (usually HDFS path) |
–export-dir | Specifies the source directory for exporting data (usually HDFS path) |
The usage method of the Sqoop command can be viewed through the sqoop -h command. I will not go into details here.
3. Oracle data acquisition format problem
Scene:
Step1: Check the number of data strips in the CISS_SERVICE_WORKORDER table in the business database.
select count(1) as cnt from CISS_SERVICE_WORKORDER; 178609strip
Step2: Collect CISS_SERVICE_WORKORDER data to HDFS
sqoop import \ --connect jdbc:oracle:thin:@:1521:helowin \ --username ciss \ --password 123456 \ --table CISS4.CISS_SERVICE_WORKORDER \ --delete-target-dir \ --target-dir /test/full_imp/ciss4.ciss_service_workorder \ --fields-terminated-by "\001" \ #Specify field splitter-m 1 #Specify the parallelism degree
Hive uses \001 as the delimiter for the table field by default, but special delimiters can also be specified when creating a table.
Step3: Use Hive to view the number of rows imported into the data table
create external table test_text( line string # Take a row of imported data as a column in the table) location '/test/full_imp/ciss4.ciss_service_workorder'; select count(*) from test_text; 195825strip
question:
After Sqoop collects data, the number of data rows stored in the HDFS data does not match the data volume of the source database.
reason:
- When Sqoop imports data in text format, the default file format of HDFS is textfile, and the default newline character is special characters\n.
- If special characters such as \n, \r, \t appear in the data column in Oracle, they will be divided into multiple lines.
Oracle Data:
id | name | age |
---|---|---|
001 | zhang\nsan | 18 |
Sqoop converted data:
001 | zhang |
san | 18 |
Data in the Hive table:
id | name | age |
---|---|---|
001 | zhang | |
san | 18 |
Solution:
- Plan 1:
- Delete or replace newline characters in data
- Sqoop parameter --hive-drop-import-delims Delete newline characters
- Sqoop parameter --hive-delims-replacement char replaces line break
Not recommended to use it, destroy the original data structure, try to maintain the original structure of ODS layer data
- Plan 2:
- Adopt special storage format, AVRO format
Common file format introduction:
type | introduce |
---|---|
TextFile | Hive's default file format, the simplest data format, easy to view and edit, consumes storage space, and has low I/O performance |
SequenceFile | Binary files containing key-value pairs optimize disk utilization and I/O, parallel operation data, high query efficiency, but maximum storage space consumption |
AvroFile | Special binary files, the main goal of design is to satisfy schema evolution, schema and data storage together |
OrcFile | Column storage, Schema is stored in footer, does not support schema evolution, highly compressed and includes indexes, query speed is very fast |
ParquetFile | Column storage, similar to Orc, has a compression ratio that is not as good as Orc, but has similar query performance, supports more tools and is more versatile. |
Features of Avro format
- advantage
- Binary data storage, good performance and high efficiency
- Use JSON to describe the pattern to support richer scenarios
- Schema and data are stored uniformly, and the message is self-describing (stores a row of data in the table as an object, and Schema is metadata)
- Schema definition allows the sorting of data
- shortcoming
- Only support Avro's own serialization format
- The reading performance of a small number of columns is relatively poor and the compression is relatively low
- Scenario: Large-scale structured data writing based on rows, large number of columns reads, or frequent Schema changes
Sqoop uses Avro format:
sqoop import \ -=true \ --connect jdbc:oracle:thin:@:1521:helowin \ --username ciss \ --password 123456 \ --table CISS4.CISS_SERVICE_WORKORDER \ --delete-target-dir \ --target-dir /test/full_imp/ciss4.ciss_service_workorder \ --as-avrodatafile \ # Select the file storage format as AVRO --fields-terminated-by "\001" \ -m 1
The storage format of the Hive table specifies the file:
create external table test_avro( line string ) stored as avro location '/test/full_imp/ciss4.ciss_service_workorder';
AVRO data is stored in binary serialization, and fields are parsed through predefined schemas instead of relying on separators. Even if the field content contains special characters such as commas and newlines, it will not affect the accuracy of the data structure.
Schema definition (JSON format), clearly describes information such as field name, type, order, etc.
4. Sqoop incremental acquisition plan
Sqoop supports two incremental modes:
- append mode:
Applicable toAppend data onlytable (such as log table), based onIncremental column(such as auto-adding primary key id) collect new data.
- lastmodified mode:
Applicable toData will be updatedtable (such as user table), based onTimestamp column(such as last_update_time) collects newly added or modified data.
The append mode requires that the source data table has self-increment columns, such as the self-increment id set when creating the table.
The lastmodified mode requires that the source data table have a timestamp field.
Append mode:
Requirements: There must be a column of self-increasing values, and judged according to the self-increasing int value.
Features: Only added data can be imported, but updated data cannot be imported.
Scenario: Data will only be added, and no update will occur.
sqoop import \ # Perform data import operation --connect jdbc:mysql://node3:3306/sqoopTest \ # Connect to MySQL database (address: node3, database name: sqoopTest) --username root \ # Database username: root --password 123456 \ # Database Password: 123456 --table tb_tohdfs \ # Source table to import: tb_tohdfs --target-dir /sqoop/import/test02 \ # HDFS target directory (data will be written to this path) --fields-terminated-by '\t' \ # The field separator is a tab character (\t) --check-column id \ # Specify incremental check column: id (usually auto-increment primary key) --incremental append \ # Incremental mode is "append" (only new data is imported) --last-value 0 \ # The maximum value of id imported last time (the initial value is 0, the first time the data with id>0 is imported) -m 1 # use1indivualMapTask(Single threaded)
Appebd mode uses last-value to record the maximum value of the data id of the last imported. The first import is generally full import, that is, id>0
The last_value here needs to be filled in manually, so it can be automatically recorded using Sqoop's job management.
sqoop job --create my_job -- import ... --incremental append --check-column id --last-value 0 sqoop job --exec my_job # Automatic update last-value
Lastmodified mode:
Requirements: The column of dynamic time changes must be included, and the time when the data changes are determined.
Features: Import both new data and updated data
Scenario: Records in the table will be added or updated, and the lastmode timestamp will be modified for each update.Generally, the requirements cannot be met, so it is not used.
sqoop import \ # Perform data import operation --connect jdbc:mysql://node3:3306/sqoopTest \ # Connect to MySQL database (address: node3, database name: sqoopTest) --username root \ # Database username: root --password 123456 \ # Database Password: 123456 --table tb_lastmode \ # Source table to import: tb_lastmode --target-dir /sqoop/import/test03 \ # HDFS target directory (data will be written to this path) --fields-terminated-by '\t' \ # The field separator is a tab character (\t) --incremental lastmodified \ # The increment mode is "lastmodified" (collects newly added or modified data) --check-column lastmode \ # Specify timestamp column: lastmode (record data update time) --last-value '2021-06-06 16:09:32' \ # Maximum time value of the last imported (add/modify data after importing this time) -m 1 # use1indivualMapTask(Single threaded)
The lastmodified mode uses a time stamp to record the update line of data.
If the same record is updated multiple times and the lastmode time exceeds --last-value, Sqoop will import the record multiple times.
Solution: Add --merge-key <primary key column> parameter to merge old and new data (based on primary key deduplication):
--merge-key id # Assumptions id is the primary key column
Custom mode:
Requirement: The output directory cannot be the same for each run.
Features: Implement incremental data filtering by yourself, which can realize the collection of new and updated data
Scenario: It is generally used to customize incremental collection of daily partition data to Hive
sqoop import \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file file:///export/data/ \ --query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \ --delete-target-dir \ --target-dir /nginx/logs/tb_order/daystr=2021-09-14 \ --fields-terminated-by '\t' \ -m 1
Custom mode can import data based on the set SQL, so it is the most commonly used scenario.
This is the end of this article about Apache Sqoop data collection. For more related Apache Sqoop data collection content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!