I have a merged DataFrame with two TimeStamp columns. I want to find the nearest (forward) TimeStamp (Timestamp1 -> Timestamp2) and take the value asociated and add it in a new column.
TimeStamp1 Value1 TimeStamp2 Value2
2021-11-01T01:55:29.473 131 2021-11-01T01:55:28.205 A
2021-11-01T01:55:30.474 3 2021-11-01T01:55:31.205 B
2021-11-01T05:01:55.247 195 2021-11-01T03:44:14.208 C
2021-11-01T05:01:56.247 67 2021-11-01T05:41:56.205 D
2021-11-01T09:41:30.264 131 2021-11-01T09:41:29.405 E
2021-11-01T09:41:32.264 67 2021-11-01T09:41:35.205 F
Expected output:
TimeStamp1 Value1 Value 2
2021-11-01T01:55:29.473 131 B
2021-11-01T01:55:30.474 3 B
2021-11-01T05:01:55.247 195 D
2021-11-01T05:01:56.247 67 D
2021-11-01T09:41:30.264 131 F
2021-11-01T09:41:32.264 67 F
Im working with PySpark, i checked some ways to do it but in pandas.
CodePudding user response:
The tranformation you are looking for can be achieved in two steps:
- Generate all posible combinations where
df["TimeStamp2"] >= df[TimeStamp1"]using aself join. This forms ourcandidate_df. - We prune the
candidate_dfto retrieve the expected rows by finding the row containing the minimumTimeStamp2across rows containing forTimeStamp1. We do this my partitioning thecandidate_dfbyTimeStamp1then ordering byTimeStamp2ascending and returning the first row.
If you have threshold for the "maximum nearness" (i.e) maximum difference between
TimeStamp1andnearest TimeStamp2, then the solution can be optimized to reduce size ofcandidate_df.
Working Example
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3, datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]
df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))
candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
.selectExpr("l.TimeStamp1 as TimeStamp1",
"l.Value1 as Value1",
"r.TimeStamp2 as TimeStamp2",
"r.Value2 as Value2")
window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")
candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
.filter(F.col("rn") == 1)\
.drop("rn", "TimeStamp2")\
.show(200, False)
Output
----------------------- ------ ------
|TimeStamp1 |Value1|Value2|
----------------------- ------ ------
|2021-11-01 01:55:29.473|131 |B |
|2021-11-01 01:55:30.474|3 |B |
|2021-11-01 05:01:55.247|195 |D |
|2021-11-01 05:01:56.247|67 |D |
|2021-11-01 09:41:30.264|131 |F |
|2021-11-01 09:41:32.264|67 |F |
----------------------- ------ ------
