PySpark: Create subset dataframe with rows from another based on a column condition

So, I have been trying to create two subsets of a dataframe using a condition involving another dataframe I have. To put this into perspective, let's say I have three dataframes:

  • Ratings: contains BOTH series' and movies' ratings
| ID | Rating |
| -- | ------ |
| 1  | 7      |
| 2  | 9      |
| 3  | 5      |
| 4  | 10     |
| 5  | 2      |
| 6  | 9      |
  • Movies: only contains movies
| ID | Name      |
| -- | --------- |
| 2  | John Wick |
| 4  | Titanic   |
| 5  | Sharknado |
  • Series: only contains series
| ID | Name         |
| -- | ------------ |
| 1  | Breaking Bad |
| 3  | Friends      |
| 6  | The Office   |

How can I divide Ratings into Ratings_movies (only contains movie ratings) and Ratings_series (only contains series ratings)? I have already tried using filters and even merging the different dataframes in order to then drop the rows where the name is null, to no avail.

In my case, ratsfull is the "Ratings" dataframe:

  • Filter attempt
ratsfull_movies = ratsfull.filter(ratsfull.anime_id.isin(movies.ID))

Results in:

AnalysisException                         Traceback (most recent call last)
Input In [29], in <cell line: 1>()
----> 1 ratsfull_movies = ratsfull.filter(ratsfull.anime_id.isin(movies.ID))

File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py:1461, in DataFrame.filter(self, condition)
   1459     jdf = self._jdf.filter(condition)
   1460 elif isinstance(condition, Column):
-> 1461     jdf = self._jdf.filter(condition._jc)
   1462 else:
   1463     raise TypeError("condition should be string or Column")

File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\py4j\java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\utils.py:134, in capture_sql_exception.<locals>.deco(*a, **kw)
    130 converted = convert_exception(e.java_exception)
    131 if not isinstance(converted, UnknownException):
    132     # Hide where the exception came from that shows a non-Pythonic
    133     # JVM exception message.
--> 134     raise_from(converted)
    135 else:
    136     raise

File <string>:3, in raise_from(e)

AnalysisException: Resolved attribute(s) ID#302 missing from user_id#1021,anime_id#1025,rating#1029 in operator !Filter anime_id#1025 IN (ID#302).;;
!Filter anime_id#1025 IN (ID#302)
+- Project [user_id#1021, anime_id#1025, cast(rating#914 as int) AS rating#1029]
   +- Project [user_id#1021, cast(anime_id#913 as int) AS anime_id#1025, rating#914]
      +- Project [cast(user_id#912 as int) AS user_id#1021, anime_id#913, rating#914]
         +- Union
            :- Relation[user_id#912,anime_id#913,rating#914] csv
            +- Relation[_c0#958,_c1#959,_c2#960] csv
  • Merge attempt
ratsfull_movies = ratsfull.merge(movies, how = "outer", left_on = "anime_id", right_on = "ID")
ratsfull_movies.show()

Results in:

AttributeError                            Traceback (most recent call last)
Input In [30], in <cell line: 1>()
----> 1 ratsfull_movies = ratsfull.merge(movies, how = "outer", left_on = "anime_id", right_on = "ID")

File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py:1400, in DataFrame.__getattr__(self, name)
   1394 """Returns the :class:`Column` denoted by ``name``.
   1395 
   1396 >>> df.select(df.age).collect()
   1397 [Row(age=2), Row(age=5)]
   1398 """
   1399 if name not in self.columns:
-> 1400     raise AttributeError(
   1401         "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
   1402 jc = self._jdf.apply(name)
   1403 return Column(jc)

AttributeError: 'DataFrame' object has no attribute 'merge'

Wished output is two dataframes:

  • Ratings_movies (only contains movie ratings)
| ID | Rating |
| -- | ------ |
| 2  | 9      |
| 4  | 10     |
| 5  | 2      |
  • Ratings_series (only contains series ratings)
| ID | Rating |
| -- | ------ |
| 1  | 7      |
| 3  | 5      |
| 6  | 9      |

Any idea/help is appreciated!



Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation