I need help for this case to fill, with a new row, missing values:
This is just an example, but I have a lot of rows with different IDs.
Input dataframe:
| ID | FLAG | DATE |
|---|---|---|
| 123 | 1 | 01/01/2021 |
| 123 | 0 | 01/02/2021 |
| 123 | 1 | 01/03/2021 |
| 123 | 0 | 01/06/2021 |
| 123 | 0 | 01/08/2021 |
| 777 | 0 | 01/01/2021 |
| 777 | 1 | 01/03/2021 |
So I have a finite set of dates and I wanna take until the last one for each ID (in the example, for ID = 123: 01/01/2021, 01/02/2021, 01/03/2021... until 01/08/2021). So basically I could do a cross join with a calendar, but I don't know how can I fill missing value with a rule or a filter, after the cross join.
Expected output: (in bold the generated missing values)
| ID | FLAG | DATE |
|---|---|---|
| 123 | 1 | 01/01/2021 |
| 123 | 0 | 01/02/2021 |
| 123 | 1 | 01/03/2021 |
| 123 | 1 | 01/04/2021 |
| 123 | 1 | 01/05/2021 |
| 123 | 0 | 01/06/2021 |
| 123 | 0 | 01/07/2021 |
| 123 | 0 | 01/08/2021 |
| 777 | 0 | 01/01/2021 |
| 777 | 0 | 01/02/2021 |
| 777 | 1 | 01/03/2021 |
CodePudding user response:
You can first group by id to calculate max and min date then using sequence function, generate all the dates from min_date to max_date. Finally, join with original dataframe and fill nulls with last non null per group of id. Here's a complete working example:
Your input dataframe:
from pyspark.sql import Window
import pyspark.sql.functions as F
df = spark.createDataFrame([
(123, 1, "01/01/2021"), (123, 0, "01/02/2021"),
(123, 1, "01/03/2021"), (123, 0, "01/06/2021"),
(123, 0, "01/08/2021"), (777, 0, "01/01/2021"),
(777, 1, "01/03/2021")
], ["id", "flag", "date"])
Groupby id and generate all possible dates for each id:
all_dates_df = df.groupBy("id").agg(
F.date_trunc("month", F.max(F.to_date("date", "dd/MM/yyyy"))).alias("max_date"),
F.date_trunc("month", F.min(F.to_date("date", "dd/MM/yyyy"))).alias("min_date")
).select(
"id",
F.expr("sequence(min_date, max_date, interval 1 month)").alias("date")
).withColumn(
"date", F.explode("date")
).withColumn(
"date",
F.date_format("date", "dd/MM/yyyy")
)
Now, left join with df and use last function over a Window partitioned by id to fill null flag values:
result = all_dates_df.join(df, ["id", "date"], "left").withColumn(
"flag",
F.last(F.col("flag"), ignorenulls=True).over(Window.partitionBy("id").orderBy("date"))
)
result.show()
# --- ---------- ----
#| id| date|flag|
# --- ---------- ----
#|123|01/01/2021| 1|
#|123|01/02/2021| 0|
#|123|01/03/2021| 1|
#|123|01/04/2021| 1|
#|123|01/05/2021| 1|
#|123|01/06/2021| 0|
#|123|01/07/2021| 0|
#|123|01/08/2021| 0|
#|777|01/01/2021| 0|
#|777|01/02/2021| 0|
#|777|01/03/2021| 1|
# --- ---------- ----
CodePudding user response:
You can find the ranges of dates between the DATE value in the current row and the following row and then use sequence to generate all intermediate dates and explode this array to fill in values for the missing dates.
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(123, 1, "01/01/2021",),
(123, 0, "01/02/2021",),
(123, 1, "01/03/2021",),
(123, 0, "01/06/2021",),
(123, 0, "01/08/2021",),
(777, 0, "01/01/2021",),
(777, 1, "01/03/2021",), ]
df = spark.createDataFrame(data, ("ID", "FLAG", "DATE",)).withColumn("DATE", F.to_date(F.col("DATE"), "dd/MM/yyyy"))
window_spec = Window.partitionBy("ID").orderBy("DATE")
next_date = F.coalesce(F.lead("DATE", 1).over(window_spec), F.add_months(F.col("DATE"), 1))
next_date_range = F.add_months(next_date, -1)
df.withColumn("Ranges", F.sequence(F.col("DATE"), next_date_range, F.expr("interval 1 month")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.date_format("date", "dd/MM/yyyy"))\
.drop("Ranges").show(truncate=False)
Output
--- ---- ----------
|ID |FLAG|DATE |
--- ---- ----------
|123|1 |01/01/2021|
|123|0 |01/02/2021|
|123|1 |01/03/2021|
|123|1 |01/04/2021|
|123|1 |01/05/2021|
|123|0 |01/06/2021|
|123|0 |01/07/2021|
|123|0 |01/08/2021|
|777|0 |01/01/2021|
|777|0 |01/02/2021|
|777|1 |01/03/2021|
--- ---- ----------
