PySpark DataFrame Scenario:
- There is a DataFrame called
DF. Two main columns ofDFareIDandDate. - Each
IDhas on average 40 uniqueDates (not continuous dates). - Now, there is second DataFrame called
DF_datewhich has one column namedDate. The dates inDatesrange between maximum and minimum of 'Date' fromDF. - Now, the goal is to fill
DFwith the continuous Start and End date of each unique 'ID' (missing discontinued dates are filled withleft joinbetweenDF_dateandDF.
DF
------------- ------------- ----------------
| Date| Val| ID|
------------- ------------- ----------------
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
------------- ------------- ----------------
DF_date
-------------
| Date|
-------------
| 2021-07-01|
| 2021-07-02|
| 2021-07-03|
| 2021-07-04|
| 2021-07-05|
| 2021-07-06|
-------------
Expected Final Output:
------------- ------------- ----------------
| Date| Val| ID|
------------- ------------- ----------------
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-02| 81119.73| Ax3838J|
| 2021-07-03| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-03| 81249.76| Bz3838J|
| 2021-07-04| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
------------- ------------- ----------------
CodePudding user response:
Your question doesn't make sense. Why have a DF_date dataframe with start and end dates, use them to fill in date and then resort to using the DF start and end date. Why not just fill in missing dates by using DF min and max date for each group.
Anyway, this is how you fill in missing dates based on DF_Date
Following your comments, see my edits
new = (DF.groupby('ID')
.agg(to_date(first('Date')).alias('min_date')#minimum date per group
,to_date(last('Date')).alias('max_date')#max date per group
,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
#Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
.selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
.drop('Date').withColumnRenamed('2','Date')
#Forward fill the Val column
.withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
).show()
------- -------- ----------
| ID| Val| Date|
------- -------- ----------
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
------- -------- ----------
CodePudding user response:
In the above question, I later realised as suggested @wwnde there is no need to create a separate DF for Dates.
Code provided below serves the purpose too -
# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')
# the ranges of dates between the DATE value in the current row and the following row
next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") F.expr("interval 1 day"))
end_date_range = next_date - F.expr("interval 1 day")
# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.
final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
.drop("Ranges")
display(final_result)
