I have got a tricky situation and I am trying to use pyspark to resolve the same
The input dataframe has two unique ids in it with class being either new or clear. The dataframe is sorted in ascending order based on Input_time.
below is the logic to create the result dataframe or table.
For a set of unique ids
If a new at time 1 has a cleared at time 2 (time2 > time1) , then that uniqueid should be set to cleared with time_up being that of the new's row and time_down that of 'cleared' row , and class 'cleared' .
if there is no cleared and there are continuous new classes of the same id , then the 1st occurence is taken and repeatCount increases for the new entries.
If for any cleared there is no new to pair with, then that row is abandoned.
input:
----------- ------------- ------- ---------- ---------- ----------- ----------
|Unique_name| Input_time| class| time_up| time_down|repeatCount|updatetime|
----------- ------------- ------- ---------- ---------- ----------- ----------
| Unique-01| 2018-09-01| new|2018-09-01| null| 0| 0|
| Unique-01| 2018-09-15|cleared| null|2018-09-15| 0| 0|
| Unique-01| 2018-09-16| new|2018-09-16| null| 0| 0|
| Unique-01| 2018-09-27|cleared| null|2018-09-27| 0| 0|
| Unique-01| 2018-09-30|cleared| null|2018-09-30| 0| 0|
| Unique-02| 2018-09-21| new|2018-09-21| null| 0| 0|
| Unique-02| 2018-09-28|cleared| null|2018-09-28| 0| 0|
| Unique-02| 2018-09-28| new|2018-09-28| null| 0| 0|
| Unique-02| 2018-10-05| new|2018-10-05| null| 0| 0|
| Unique-02| 2018-10-15|cleared| null|2018-10-15| 0| 0|
| Unique-02| 2018-10-15| new|2018-10-15| null| 0| 0|
----------- ------------- ------- ---------- ---------- ----------- ----------
Result:
----------- ------------- ------- ---------- ---------- ----------- ----------
|Unique_name| Input_time| class| time_up| time_down|repeatCount|updatetime|
----------- ------------- ------- ---------- ---------- ----------- ----------
| Unique-01| 2018-09-01|cleared|2018-09-01|2018-09-15| 0| 0|
| Unique-01| 2018-09-16|cleared|2018-09-16|2018-09-27| 0| 0|
| Unique-02| 2018-09-21|cleared|2018-09-21|2018-09-28| 0| 0|
| Unique-02| 2018-10-28|cleared|2018-09-28|2018-10-15| 1|2018-10-05|
| Unique-02| 2018-10-15| new|2018-10-15| null| 0| 0|
----------- ------------- ------- ---------- ---------- ----------- ----------
In this example, Unique-01 1st row is a new (2018-09-01),(the next row which comes at a later time) is a cleared, so this forms a pair and the 1st row should be updated with 2nd row's class and time_down
then again a new at 2018-09-16, cleared at 2018-09-27 the next row is 2018-09-30 but there is no 'new' to be cleared/paired. so this is abandoned.
Unique-02 1st row is a new (2018-09-21),(the next row which comes at a later time) is a cleared, so this forms a pair and the 1st row should be updated with 2nd row's class and time_down(2018-09-28)
The next row (Input_time at 2018-09-28) is a new,there is no clear for it in next row but the next row is again a new(2018-10-05), so the repeatCount is now increased to 1 and updatetime is the timeup of the repeated row..
The next row is a cleared, so the new with a repeated 1 , is now cleared.
Finally there is unique-02 at 2018-10-15, and there is no new row/clear for it and is now the latest.
Can anyone please suggest ideas on how to achieve this using pyspark.
I did try converting the dataframes to list of dicts and iterating through but its a very time consuming and inefficient approach if there are 1000s of rows.
CodePudding user response:
One way to arrive at the expected output is:
- Identify and merge consecutive
newclass rows and maintain therepeatCountandupdatetime - Preserve only the first
clearedclass rows for consecutiveclearedrows - For the dataframe in step 2, find the class for the next row and if
clearedupdate the relevant columns
from pyspark.sql import functions as F
from pyspark.sql.functions import col as c, lit as l
from pyspark.sql import Window as W
data = [("Unique-01", "2018-09-01", "new", "2018-09-01", None,),
("Unique-01", "2018-09-15", "cleared", None, "2018-09-15",),
("Unique-01", "2018-09-16", "new", "2018-09-16", None,),
("Unique-01", "2018-09-27", "cleared", None, "2018-09-27",),
("Unique-01", "2018-09-30", "cleared", None, "2018-09-30",),
("Unique-02", "2018-09-21", "new", "2018-09-21", None,),
("Unique-02", "2018-09-28", "cleared", None, "2018-09-28",),
("Unique-02", "2018-09-28", "new", "2018-09-28", None,),
("Unique-02", "2018-10-05", "new", "2018-10-05", None,),
("Unique-02", "2018-10-15", "cleared", None, "2018-10-15",),
("Unique-02", "2018-10-15", "new", "2018-10-15", None,), ]
df = spark.createDataFrame(data, ("Unique_name", "Input_time", "class", "time_up", "time_down",))
time_columns = ["Input_time", "time_up", "time_down", ]
df = df.select(*[F.to_date(c).alias(c) if c in time_columns else c for c in df.columns])
# STEP 1 and 2
ws = W.partitionBy("Unique_name").orderBy("Input_time")
runs_df = (df.withColumn("prev_class", F.lag("class", 1).over(ws))
.where(~((c("class") == c("prev_class")) & (c("class") == l("cleared"))))
.withColumn("run_indicator", F.when(c("class") == c("prev_class"), l(0)).otherwise(l(1)))
.withColumn("runs", F.sum("run_indicator").over(ws.rowsBetween(W.unboundedPreceding, W.currentRow)))
.drop("run_indicator")
)
new_class_run_ws = W.partitionBy("Unique_name", "runs").orderBy("Input_time").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
merged_new_runs = (runs_df.withColumn("run_start_input_time", F.first("Input_time").over(new_class_run_ws))
.withColumn("run_end_input_time", F.last("Input_time").over(new_class_run_ws))
.withColumn("repeat_count", F.count("Input_time").over(new_class_run_ws) - l(1))
.where("run_start_input_time == Input_time")
.drop("prev_class", "runs", "run_start_input_time", )
)
# STEP 3
(merged_new_runs.withColumn("next_class", F.lead("class", 1).over(ws))
.withColumn("next_time_down", F.lead("time_down", 1).over(ws))
.where("class == 'new'")
.withColumn("class", F.coalesce("next_class", "class"))
.drop("time_down")
.withColumnRenamed("next_time_down", "time_down")
.withColumn("updatetime", F.when(c("repeat_count") > 0, c("run_end_input_time")).otherwise(l(None)))
.select("Unique_name", "Input_time", "class", "time_up", "time_down", "repeat_count", "updatetime", )
).show()
Output
----------- ---------- ------- ---------- ---------- ------------ ----------
|Unique_name|Input_time| class| time_up| time_down|repeat_count|updatetime|
----------- ---------- ------- ---------- ---------- ------------ ----------
| Unique-01|2018-09-01|cleared|2018-09-01|2018-09-15| 0| null|
| Unique-01|2018-09-16|cleared|2018-09-16|2018-09-27| 0| null|
| Unique-02|2018-09-21|cleared|2018-09-21|2018-09-28| 0| null|
| Unique-02|2018-09-28|cleared|2018-09-28|2018-10-15| 1|2018-10-05|
| Unique-02|2018-10-15| new|2018-10-15| null| 0| null|
----------- ---------- ------- ---------- ---------- ------------ ----------
