pyspark nested for loops
I have seen many question of the same nature, but, I am still confused: some say use Groupby, some propose the use of map or flatmap. Not sure what to try.
Input: A PySpark DF with date, term, brand, text column. Expected output: A list of filtered dfs [df1, df2, ...]
In Python, the code looks like this (df is a big dataframe, has about 10000 records):
filtered_df_list = []
for term in term_list:
for br in brand_list:
for dt in date_list:
tcd_df = df.filter( (df.term == term) & (df.brand == brand) & (df.date == dt) )
if len(tcd_df.index) > 0:
filtered_df_list.append(tcd_df)
Also, can I create a dataframe out of term, brand and date lists and do a 'withColumn' to create a new column that will have the filtered dfs? I am not sure. A starter code for parallelizing this will be helpful.
Update: This is what I tried and it looks like worker nodes are trying to do the filtering with the sparkContext that is not allowed.
rdd2 = df.rdd.map(lambda row: (
row["date"], row["brand"], row["term"], row["text"],
df.filter((df.brand == row["brand"]) & (df.term == row["term"]) & (df.date == row["date"]))
))
print("\n\nAfter loop")
df2 = rdd2.toDF(["date", "brand", "term", "text"])
df2.show()
Error (more or less I expected this): _pickle.PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Comments
Post a Comment