I have a data as follows and I would want to group by id and whenever there is a change in value within each id I would want to get first and last values of timestamp
time id value
1/20/2022 9:46:48.756 London 9
1/20/2022 9:46:48.756 London 9
1/20/2022 9:46:49.146 London 9
1/20/2022 9:46:55.855 Paris 1
1/20/2022 9:46:55.955 Paris 4
1/20/2022 9:46:56.145 Paris 4
1/20/2022 9:46:57.179 London 4
1/20/2022 9:46:58.179 London 4
1/20/2022 9:46:57.455 Chicago 2
1/20/2022 9:46:59.145 Chicago 2
1/20/2022 9:47:04.145 Chicago 2
1/20/2022 9:47:06.145 Detroit 9
1/20/2022 9:47:07.654 Detroit 9
1/20/2022 9:47:08.554 Detroit 9
1/20/2022 9:47:11.144 Atlanta 9
1/20/2022 9:47:11.159 Atlanta 9
1/20/2022 9:47:17.144 California 4
1/20/2022 9:47:25.143 California 4
1/20/2022 9:47:46.143 California 4
1/20/2022 9:47:48.143 California 4
My result spark dataframe should look like below
id value start_time end_time
London 9 1/20/2022 9:46:48.756 1/20/2022 9:46:49.146
Paris 1 1/20/2022 9:46:55.855 1/20/2022 9:46:55.855
Paris 4 1/20/2022 9:46:55.955 1/20/2022 9:46:56.145
London 4 1/20/2022 9:46:57.179 1/20/2022 9:46:58.179
Chicago 2 1/20/2022 9:46:57.455 1/20/2022 9:47:04.145
Detroit 9 1/20/2022 9:47:06.145 1/20/2022 9:47:08.554
Atlanta 9 1/20/2022 9:47:11.144 1/20/2022 9:47:11.159
California 4 1/20/2022 9:47:17.144 1/20/2022 9:47:48.143
I have tried below code and this will only give me max and min values only when there is value change in the next row
w = Window.partitionBy('id').orderBy('timestamp')
df = (
data.withColumn('id', (F.col('id') != F.lag('id').over(w)).cast('int'))
.withColumn('value_changed', (F.col('value') != F.lag('value', 1, 0).over(w)).cast('int'))
.withColumn('id_group_id', F.sum('id_changed').over(w))
.withColumn('value_group_id', F.sum('value_changed').over(w))
.groupBy('id', 'id_group_id', 'value', 'value_group_id')
.agg(
F.min('time').alias('start_time'),
F.max('time').alias('end_time')
)
.drop('id_group_id','value_group_id')
)
df.show()
Thank you for the help
CodePudding user response:
It seems you simply want to group by id value and calculate min/max time if I correctly understood your question:
from pyspark.sql import functions as F
result = df.groupBy("id", "value").agg(
F.min("time").alias("start_time"),
F.max("time").alias("end_time")
)
result.show(truncate=False)
# ---------- ----- --------------------- ---------------------
#|id |value|start_time |end_time |
# ---------- ----- --------------------- ---------------------
#|Atlanta |9 |1/20/2022 9:47:11.144|1/20/2022 9:47:11.159|
#|California|4 |1/20/2022 9:47:17.144|1/20/2022 9:47:48.143|
#|Chicago |2 |1/20/2022 9:46:57.455|1/20/2022 9:47:04.145|
#|Detroit |9 |1/20/2022 9:47:06.145|1/20/2022 9:47:08.554|
#|London |4 |1/20/2022 9:46:57.179|1/20/2022 9:46:58.179|
#|London |9 |1/20/2022 9:46:48.756|1/20/2022 9:46:49.146|
#|Paris |1 |1/20/2022 9:46:55.855|1/20/2022 9:46:55.855|
#|Paris |4 |1/20/2022 9:46:55.955|1/20/2022 9:46:56.145|
# ---------- ----- --------------------- ---------------------
