PySpark: Create subset dataframe with rows from another based on a column condition
So, I have been trying to create two subsets of a dataframe using a condition involving another dataframe I have. To put this into perspective, let's say I have three dataframes:
- Ratings: contains BOTH series' and movies' ratings
| ID | Rating |
| -- | ------ |
| 1 | 7 |
| 2 | 9 |
| 3 | 5 |
| 4 | 10 |
| 5 | 2 |
| 6 | 9 |
- Movies: only contains movies
| ID | Name |
| -- | --------- |
| 2 | John Wick |
| 4 | Titanic |
| 5 | Sharknado |
- Series: only contains series
| ID | Name |
| -- | ------------ |
| 1 | Breaking Bad |
| 3 | Friends |
| 6 | The Office |
How can I divide Ratings into Ratings_movies (only contains movie ratings) and Ratings_series (only contains series ratings)? I have already tried using filters and even merging the different dataframes in order to then drop the rows where the name is null, to no avail.
In my case, ratsfull is the "Ratings" dataframe:
- Filter attempt
ratsfull_movies = ratsfull.filter(ratsfull.anime_id.isin(movies.ID))
Results in:
AnalysisException Traceback (most recent call last)
Input In [29], in <cell line: 1>()
----> 1 ratsfull_movies = ratsfull.filter(ratsfull.anime_id.isin(movies.ID))
File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py:1461, in DataFrame.filter(self, condition)
1459 jdf = self._jdf.filter(condition)
1460 elif isinstance(condition, Column):
-> 1461 jdf = self._jdf.filter(condition._jc)
1462 else:
1463 raise TypeError("condition should be string or Column")
File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\py4j\java_gateway.py:1304, in JavaMember.__call__(self, *args)
1298 command = proto.CALL_COMMAND_NAME +\
1299 self.command_header +\
1300 args_command +\
1301 proto.END_COMMAND_PART
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1307 for temp_arg in temp_args:
1308 temp_arg._detach()
File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\utils.py:134, in capture_sql_exception.<locals>.deco(*a, **kw)
130 converted = convert_exception(e.java_exception)
131 if not isinstance(converted, UnknownException):
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
File <string>:3, in raise_from(e)
AnalysisException: Resolved attribute(s) ID#302 missing from user_id#1021,anime_id#1025,rating#1029 in operator !Filter anime_id#1025 IN (ID#302).;;
!Filter anime_id#1025 IN (ID#302)
+- Project [user_id#1021, anime_id#1025, cast(rating#914 as int) AS rating#1029]
+- Project [user_id#1021, cast(anime_id#913 as int) AS anime_id#1025, rating#914]
+- Project [cast(user_id#912 as int) AS user_id#1021, anime_id#913, rating#914]
+- Union
:- Relation[user_id#912,anime_id#913,rating#914] csv
+- Relation[_c0#958,_c1#959,_c2#960] csv
- Merge attempt
ratsfull_movies = ratsfull.merge(movies, how = "outer", left_on = "anime_id", right_on = "ID")
ratsfull_movies.show()
Results in:
AttributeError Traceback (most recent call last)
Input In [30], in <cell line: 1>()
----> 1 ratsfull_movies = ratsfull.merge(movies, how = "outer", left_on = "anime_id", right_on = "ID")
File ~\Documents\spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py:1400, in DataFrame.__getattr__(self, name)
1394 """Returns the :class:`Column` denoted by ``name``.
1395
1396 >>> df.select(df.age).collect()
1397 [Row(age=2), Row(age=5)]
1398 """
1399 if name not in self.columns:
-> 1400 raise AttributeError(
1401 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
1402 jc = self._jdf.apply(name)
1403 return Column(jc)
AttributeError: 'DataFrame' object has no attribute 'merge'
Wished output is two dataframes:
- Ratings_movies (only contains movie ratings)
| ID | Rating |
| -- | ------ |
| 2 | 9 |
| 4 | 10 |
| 5 | 2 |
- Ratings_series (only contains series ratings)
| ID | Rating |
| -- | ------ |
| 1 | 7 |
| 3 | 5 |
| 6 | 9 |
Any idea/help is appreciated!
Comments
Post a Comment