2023-06-08

pyspark nested for loops

I have seen many question of the same nature, but, I am still confused: some say use Groupby, some propose the use of map or flatmap. Not sure what to try.

Input: A PySpark DF with date, term, brand, text column. Expected output: A list of filtered dfs [df1, df2, ...]

In Python, the code looks like this (df is a big dataframe, has about 10000 records):

filtered_df_list = []

for term in term_list:
  for br in brand_list:
    for dt in date_list:
      tcd_df = df.filter( (df.term == term) & (df.brand == brand) & (df.date == dt) )
      if len(tcd_df.index) > 0:
        filtered_df_list.append(tcd_df)

Also, can I create a dataframe out of term, brand and date lists and do a 'withColumn' to create a new column that will have the filtered dfs? I am not sure. A starter code for parallelizing this will be helpful.

Update: This is what I tried and it looks like worker nodes are trying to do the filtering with the sparkContext that is not allowed.

rdd2 = df.rdd.map(lambda row: (
    row["date"], row["brand"], row["term"], row["text"],
    df.filter((df.brand == row["brand"]) & (df.term == row["term"]) & (df.date == row["date"]))
))

print("\n\nAfter loop")
df2 = rdd2.toDF(["date", "brand", "term", "text"])
df2.show()

Error (more or less I expected this): _pickle.PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.



No comments:

Post a Comment