Cumulative Count some status by ID - Pyspark

I have a dataframe in pyspark, similar this:

+---+------+-----------+
|id |status|date       |
+---+------+-----------+
|1  |1     |01-01-2022 |
|1  |0     |02-01-2022 |
|1  |0     |03-01-2022 |
|1  |0     |04-01-2022 |
|1  |1     |05-01-2022 |
|1  |0     |06-01-2022 |
|2  |1     |01-01-2022 |
|2  |0     |02-01-2022 |
|2  |0     |03-01-2022 |
|2  |1     |04-01-2022 |
|2  |0     |05-01-2022 |
+---+------+-----------+

Where do I have customer IDs and their daily status

I would like to count how many days they are in sequence in status 0, like this:

Expected output:

+---+------+-----------+------------+
|id |status|date       |count_status|
+---+------+-----------+------------+
|1  |1     |01-01-2022 | 0          |
|1  |0     |02-01-2022 | 1          |
|1  |0     |03-01-2022 | 2          |
|1  |0     |04-01-2022 | 3          |
|1  |1     |05-01-2022 | 0          |
|1  |0     |06-01-2022 | 1          |
|2  |1     |01-01-2022 | 0          |
|2  |0     |02-01-2022 | 1          |
|2  |0     |03-01-2022 | 2          |
|2  |1     |04-01-2022 | 0          |
|2  |0     |05-01-2022 | 1          |
+---+------+-----------+-----------+

In python I made this code:

df['count_status'] = np.where(df['status'] == 0, 
                             df.groupby(['id', 
                                        (df['status'] != df['status'].shift(1)).cumsum()]).cumcount()+1,
                                  0)

I recently started in Pyspark and I can't reproduce my code. I tried to do a join of separate tables, but without success.

I saw some solutions using the window function, but I couldn't understand how to apply the window function with this lag from the previous day.

And I've seen some comments of being a problem in production these solutions.

I ended up getting lost in my research.



Comments

Popular posts from this blog

Spring Elasticsearch Operations

Network Error and Timeout on Authorize.net JS

Object oriented programming concepts (OOPs)