Home > Mobile >  Pyspark- compare rows within the same group and formulate new columns based on the comparision
Pyspark- compare rows within the same group and formulate new columns based on the comparision

Time:01-11

I have got a tricky situation and I am trying to use pyspark to resolve the same

The input dataframe has two unique ids in it with class being either new or clear. The dataframe is sorted in ascending order based on Input_time.

below is the logic to create the result dataframe or table.

For a set of unique ids

If a new at time 1 has a cleared at time 2 (time2 > time1) , then that uniqueid should be set to cleared with time_up being that of the new's row and time_down that of 'cleared' row , and class 'cleared' .

if there is no cleared and there are continuous new classes of the same id , then the 1st occurence is taken and repeatCount increases for the new entries.

If for any cleared there is no new to pair with, then that row is abandoned.

input:

 ----------- ------------- ------- ---------- ---------- ----------- ---------- 
|Unique_name|   Input_time|  class|   time_up| time_down|repeatCount|updatetime|
 ----------- ------------- ------- ---------- ---------- ----------- ---------- 
|  Unique-01|   2018-09-01|    new|2018-09-01|      null|          0|         0|
|  Unique-01|   2018-09-15|cleared|      null|2018-09-15|          0|         0|
|  Unique-01|   2018-09-16|    new|2018-09-16|      null|          0|         0|
|  Unique-01|   2018-09-27|cleared|      null|2018-09-27|          0|         0|
|  Unique-01|   2018-09-30|cleared|      null|2018-09-30|          0|         0|
|  Unique-02|   2018-09-21|    new|2018-09-21|      null|          0|         0|
|  Unique-02|   2018-09-28|cleared|      null|2018-09-28|          0|         0|
|  Unique-02|   2018-09-28|    new|2018-09-28|      null|          0|         0|
|  Unique-02|   2018-10-05|    new|2018-10-05|      null|          0|         0|
|  Unique-02|   2018-10-15|cleared|      null|2018-10-15|          0|         0|
|  Unique-02|   2018-10-15|    new|2018-10-15|      null|          0|         0|
 ----------- ------------- ------- ---------- ---------- ----------- ---------- 

Result:

 ----------- ------------- ------- ---------- ---------- ----------- ---------- 
|Unique_name|   Input_time|  class|   time_up| time_down|repeatCount|updatetime|
 ----------- ------------- ------- ---------- ---------- ----------- ---------- 
|  Unique-01|   2018-09-01|cleared|2018-09-01|2018-09-15|          0|         0|
|  Unique-01|   2018-09-16|cleared|2018-09-16|2018-09-27|          0|         0|
|  Unique-02|   2018-09-21|cleared|2018-09-21|2018-09-28|          0|         0|
|  Unique-02|   2018-10-28|cleared|2018-09-28|2018-10-15|          1|2018-10-05|
|  Unique-02|   2018-10-15|    new|2018-10-15|      null|          0|         0|
 ----------- ------------- ------- ---------- ---------- ----------- ---------- 

In this example, Unique-01 1st row is a new (2018-09-01),(the next row which comes at a later time) is a cleared, so this forms a pair and the 1st row should be updated with 2nd row's class and time_down

then again a new at 2018-09-16, cleared at 2018-09-27 the next row is 2018-09-30 but there is no 'new' to be cleared/paired. so this is abandoned.

Unique-02 1st row is a new (2018-09-21),(the next row which comes at a later time) is a cleared, so this forms a pair and the 1st row should be updated with 2nd row's class and time_down(2018-09-28)

The next row (Input_time at 2018-09-28) is a new,there is no clear for it in next row but the next row is again a new(2018-10-05), so the repeatCount is now increased to 1 and updatetime is the timeup of the repeated row..

The next row is a cleared, so the new with a repeated 1 , is now cleared.

Finally there is unique-02 at 2018-10-15, and there is no new row/clear for it and is now the latest.

Can anyone please suggest ideas on how to achieve this using pyspark.

I did try converting the dataframes to list of dicts and iterating through but its a very time consuming and inefficient approach if there are 1000s of rows.

CodePudding user response:

One way to arrive at the expected output is:

  1. Identify and merge consecutive new class rows and maintain the repeatCount and updatetime
  2. Preserve only the first cleared class rows for consecutive cleared rows
  3. For the dataframe in step 2, find the class for the next row and if cleared update the relevant columns
from pyspark.sql import functions as F
from pyspark.sql.functions import col as c, lit as l
from pyspark.sql import Window as W

data = [("Unique-01", "2018-09-01", "new", "2018-09-01", None,),
        ("Unique-01", "2018-09-15", "cleared", None, "2018-09-15",),
        ("Unique-01", "2018-09-16", "new", "2018-09-16", None,),
        ("Unique-01", "2018-09-27", "cleared", None, "2018-09-27",),
        ("Unique-01", "2018-09-30", "cleared", None, "2018-09-30",),
        ("Unique-02", "2018-09-21", "new", "2018-09-21", None,),
        ("Unique-02", "2018-09-28", "cleared", None, "2018-09-28",),
        ("Unique-02", "2018-09-28", "new", "2018-09-28", None,),
        ("Unique-02", "2018-10-05", "new", "2018-10-05", None,),
        ("Unique-02", "2018-10-15", "cleared", None, "2018-10-15",),
        ("Unique-02", "2018-10-15", "new", "2018-10-15", None,), ]

df = spark.createDataFrame(data, ("Unique_name", "Input_time", "class", "time_up", "time_down",))

time_columns = ["Input_time", "time_up", "time_down", ]

df = df.select(*[F.to_date(c).alias(c) if c in time_columns else c for c in df.columns])

# STEP 1 and 2
ws = W.partitionBy("Unique_name").orderBy("Input_time")
runs_df = (df.withColumn("prev_class", F.lag("class", 1).over(ws))
   .where(~((c("class") == c("prev_class")) & (c("class") == l("cleared"))))
   .withColumn("run_indicator", F.when(c("class") == c("prev_class"), l(0)).otherwise(l(1)))
   .withColumn("runs", F.sum("run_indicator").over(ws.rowsBetween(W.unboundedPreceding, W.currentRow)))
   .drop("run_indicator")
)

new_class_run_ws = W.partitionBy("Unique_name", "runs").orderBy("Input_time").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

merged_new_runs = (runs_df.withColumn("run_start_input_time", F.first("Input_time").over(new_class_run_ws))
        .withColumn("run_end_input_time", F.last("Input_time").over(new_class_run_ws))
        .withColumn("repeat_count", F.count("Input_time").over(new_class_run_ws) - l(1))
        .where("run_start_input_time == Input_time")
        .drop("prev_class", "runs", "run_start_input_time", )
)

# STEP 3

(merged_new_runs.withColumn("next_class", F.lead("class", 1).over(ws))
                .withColumn("next_time_down", F.lead("time_down", 1).over(ws))
                .where("class == 'new'")
                .withColumn("class", F.coalesce("next_class", "class"))
                .drop("time_down")
                .withColumnRenamed("next_time_down", "time_down")
                .withColumn("updatetime", F.when(c("repeat_count") > 0, c("run_end_input_time")).otherwise(l(None)))
                .select("Unique_name", "Input_time", "class", "time_up", "time_down", "repeat_count", "updatetime", )
).show()

Output

 ----------- ---------- ------- ---------- ---------- ------------ ---------- 
|Unique_name|Input_time|  class|   time_up| time_down|repeat_count|updatetime|
 ----------- ---------- ------- ---------- ---------- ------------ ---------- 
|  Unique-01|2018-09-01|cleared|2018-09-01|2018-09-15|           0|      null|
|  Unique-01|2018-09-16|cleared|2018-09-16|2018-09-27|           0|      null|
|  Unique-02|2018-09-21|cleared|2018-09-21|2018-09-28|           0|      null|
|  Unique-02|2018-09-28|cleared|2018-09-28|2018-10-15|           1|2018-10-05|
|  Unique-02|2018-10-15|    new|2018-10-15|      null|           0|      null|
 ----------- ---------- ------- ---------- ---------- ------------ ---------- 
  •  Tags:  
  • Related