SoFunction
Updated on 2024-11-16

Reading and writing Hive data operations in python with pyspark

1, read Hive table data

pyspark read hive data is very simple, because it has a special interface to read, do not need to be like hbase, need to do a lot of configuration, pyspark provides the operation of the hive interface, so that the program can be used directly to query the required data from inside the hive using SQL statements, the code is as follows:

from  import HiveContext,SparkSession
 
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark_session = (_SPARK_HOST).appName(_APP_NAME).getOrCreate()
 
hive_context= HiveContext(spark_session )
 
# Generate a query SQL statement, this is the same as the hive query statement, so you can also add where and other conditional statements
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
 
# Data queried in hive via SQL statement is directly in the form of dataframe
read_df = hive_context.sql(hive_read)

2, the data will be written to the hive table

There are two ways for pyspark to write hive tables:

(1) Generate table by SQL statement

from  import SparkSession, HiveContext
 
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
 
spark = (_SPARK_HOST).appName(_APP_NAME).getOrCreate()
 
data = [
 (1,"3","145"),
 (1,"4","146"),
 (1,"5","25"),
 (1,"6","26"),
 (2,"32","32"),
 (2,"8","134"),
 (2,"8","134"),
 (2,"9","137")
]
df = (data, ['id', "test_id", 'camera_id'])
 
# method one, default is the name of the default database, write_test is the name of the data table to be written to default
('test_hive')
("create table default.write_test select * from test_hive")

(2) saveastable way

# method two
 
# "overwrite" is a mode that rewrites a table, overwriting the original data if the table exists, and regenerating a table if it doesn't exist.
# mode("append") is to add data on top of the original table
("hive").mode("overwrite").saveAsTable('default.write_test')
 

tips:

spark with the above ways to read and write hive, you need to submit the task with the appropriate configuration, otherwise it will report an error:

spark-submit --conf =hive

Additional knowledge:PySpark reads HBase data and turns it into DataFrame based on the SHC framework.

First, you need to copy the HBase directory lib under the jar package and the SHC jar package to all nodes of the Spark directory lib under the

Second, modify the path where the above jar package is located and add it in

III. Restarting the cluster

IV. Codes

#/usr/bin/python
#-*- coding:utf-8 –*-
 
from pyspark import SparkContext
from  import SQLContext,HiveContext,SparkSession
from  import Row,StringType,StructField,StringType,IntegerType
from  import DataFrame
 
sc = SparkContext(appName="pyspark_hbase")
sql_sc = SQLContext(sc)
 
dep = ""
#define schema
catalog = """{
       "table":{"namespace":"default", "name":"teacher"},
       "rowkey":"key",
       "columns":{
            "id":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"teacherInfo", "col":"name", "type":"string"},
            "age":{"cf":"teacherInfo", "col":"age", "type":"string"},
            "gender":{"cf":"teacherInfo", "col":"gender","type":"string"},
            "cat":{"cf":"teacherInfo", "col":"cat","type":"string"},
            "tag":{"cf":"teacherInfo", "col":"tag", "type":"string"},
            "level":{"cf":"teacherInfo", "col":"level","type":"string"} }
      }"""
 
df = sql_sc.(catalog = catalog).format(dep).load()
 
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
()
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
()

V. Explanations

Please refer to my previous article for the source of the data, and I will not repeat it here.

The schema definition reference is shown in Figure:

VI. Results

The above this use of pyspark in python to read and write Hive data operations is all I have to share with you, I hope to be able to give you a reference, and I hope you will support me more.