main body (of a book)
As an algorithmic engineer, daily study and work are not just aboutTraining models to focus on effects More time is spent in thePreparing Sample Data and Analyzing Data These processes are all related to big data.The spark and hadoop ecosystem A number of tools are relevant.
We won't be updating today.machine learning cap (a poem)algorithmic model Related to that, share twospark function Barring that, I've used it before in some sort of scenario but didn't save the collection, oops!lit. clasp the Buddha's feet when danger arises (idiom); fig. to profess devotion only when in trouble The feeling is really painful.It's too much of a delay. 。
So, write down these two functions here.be prepared for an emergency Requirements ~
(1) Get the global sort ID of the spark dataframe
This function'sapplication scenario That is:Sort the dataframe of spark according to the value of a column, get the global rank id of the global multi-partition sort, add a new column to save the rank id, and keep the other columns unchanged. 。
Some users will say thatIsn't it easy? But the hard part is:orderBy records the global ID and keeps the original DF data in all columns. 。
There's no point in talking about it. We've come across this scenario.Direct copy We can use this function to solve the same kind of problem.
The spark version of the code written in scala:
def dfZipWithIndex( df: DataFrame, offset: Int = 1, colName: String ="rank_id", inFront: Boolean = true ) : DataFrame = { ( (ln => ( (if (inFront) Seq(ln._2 + offset) else Seq()) ++ ln._1.toSeq ++ (if (inFront) Seq() else Seq(ln._2 + offset)) ) ), StructType( (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) ++ ++ (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) ) ) }
function call we can call with this line of code:val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc))
Just copy it.
The pyspark version of the code written in python:
from import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rank_id"): new_schema = StructType( [StructField(colName,LongType(),True)] # new added field in front + # previous schema ) zipped_rdd = () new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) return (new_rdd, new_schema)
Call Same I won't go into the details here.
(2) Retain maximum rows after grouping
This function'sapplication scenario That is: when we use spark or then sparkSQL to look up data in a dataframe, there may be multiple records for any one user on a given day, and we need toFor each user, keep the row in the dataframe with the largest value in a column. 。
includedcrux In:After grouping each user at once, find the row with the largest value for data retention among multiple rows of records for each user. 。
Of course, afterSimple modification of the code, not necessarily the largest, the smallest is also possible, the average are ok 。
The spark version of the code written in scala:
// Get the row of a user's record with the most time in multiple records for a single user in a single day. import import val w = ("user_id") val result_df = raw_df .withColumn("max_time",("time").over(w)) .where($"time" === $"max_time") .drop($"max_time")
The pyspark version of the code written in python:
# pyspark dataframe The row in which the element with the largest value in a column is located. # GroupBy columns and filter Pyspark for rows with the largest value in a column # Create a Window to partition by column A and use it to calculate the maximum value for each group. Then filter the rows so that the value in column B equals the maximum value from import Window w = ('user_id') result_df = (raw_df).withColumn('max_time', ('time').over(w))\ .where(('time') == ('time')) .drop('max_time')
As we can see, the key to this function is the use of thespark's window function And the flexibility that comes with it, it's a powerful thing!
Up to here.spark sharp 2 function of the dataframe global sort id and grouped to retain the maximum value of rows The full text of this article is written , more information about spark dataframe global sorting please pay attention to my other related articles !