Home > Net >  PySpark get min and max dates each time there is a value change
PySpark get min and max dates each time there is a value change

Time:02-03

I have a data as follows and I would want to group by id and whenever there is a change in value within each id I would want to get first and last values of timestamp

time                     id         value
1/20/2022 9:46:48.756   London        9
1/20/2022 9:46:48.756   London        9
1/20/2022 9:46:49.146   London        9
1/20/2022 9:46:55.855   Paris         1
1/20/2022 9:46:55.955   Paris         4
1/20/2022 9:46:56.145   Paris         4
1/20/2022 9:46:57.179   London        4
1/20/2022 9:46:58.179   London        4
1/20/2022 9:46:57.455   Chicago       2
1/20/2022 9:46:59.145   Chicago       2
1/20/2022 9:47:04.145   Chicago       2
1/20/2022 9:47:06.145   Detroit       9
1/20/2022 9:47:07.654   Detroit       9
1/20/2022 9:47:08.554   Detroit       9
1/20/2022 9:47:11.144   Atlanta       9
1/20/2022 9:47:11.159   Atlanta       9
1/20/2022 9:47:17.144   California    4
1/20/2022 9:47:25.143   California    4
1/20/2022 9:47:46.143   California    4
1/20/2022 9:47:48.143   California    4

My result spark dataframe should look like below

id        value     start_time                    end_time
London     9    1/20/2022 9:46:48.756   1/20/2022 9:46:49.146
Paris      1    1/20/2022 9:46:55.855   1/20/2022 9:46:55.855
Paris      4    1/20/2022 9:46:55.955   1/20/2022 9:46:56.145
London     4    1/20/2022 9:46:57.179   1/20/2022 9:46:58.179
Chicago    2    1/20/2022 9:46:57.455   1/20/2022 9:47:04.145
Detroit    9    1/20/2022 9:47:06.145   1/20/2022 9:47:08.554
Atlanta    9    1/20/2022 9:47:11.144   1/20/2022 9:47:11.159
California 4    1/20/2022 9:47:17.144   1/20/2022 9:47:48.143

I have tried below code and this will only give me max and min values only when there is value change in the next row

w = Window.partitionBy('id').orderBy('timestamp')

df = (
  data.withColumn('id', (F.col('id') != F.lag('id').over(w)).cast('int'))
  .withColumn('value_changed', (F.col('value') != F.lag('value', 1, 0).over(w)).cast('int'))
  .withColumn('id_group_id', F.sum('id_changed').over(w))
  .withColumn('value_group_id', F.sum('value_changed').over(w))
  .groupBy('id', 'id_group_id', 'value', 'value_group_id')
  .agg(
    F.min('time').alias('start_time'),
    F.max('time').alias('end_time')
  )
  .drop('id_group_id','value_group_id')
)

df.show()

Thank you for the help

CodePudding user response:

It seems you simply want to group by id value and calculate min/max time if I correctly understood your question:

from pyspark.sql import functions as F

result = df.groupBy("id", "value").agg(
    F.min("time").alias("start_time"),
    F.max("time").alias("end_time")
)

result.show(truncate=False)
# ---------- ----- --------------------- --------------------- 
#|id        |value|start_time           |end_time             |
# ---------- ----- --------------------- --------------------- 
#|Atlanta   |9    |1/20/2022 9:47:11.144|1/20/2022 9:47:11.159|
#|California|4    |1/20/2022 9:47:17.144|1/20/2022 9:47:48.143|
#|Chicago   |2    |1/20/2022 9:46:57.455|1/20/2022 9:47:04.145|
#|Detroit   |9    |1/20/2022 9:47:06.145|1/20/2022 9:47:08.554|
#|London    |4    |1/20/2022 9:46:57.179|1/20/2022 9:46:58.179|
#|London    |9    |1/20/2022 9:46:48.756|1/20/2022 9:46:49.146|
#|Paris     |1    |1/20/2022 9:46:55.855|1/20/2022 9:46:55.855|
#|Paris     |4    |1/20/2022 9:46:55.955|1/20/2022 9:46:56.145|
# ---------- ----- --------------------- --------------------- 
  •  Tags:  
  • Related