my question is as below. I'm trying to calculate future value, let's say backlog value in pyspark dataframe.
My smaple data frame is:
Task start_date end_date Total_salary
Task1 2022-01-01 01-04-2022 500
Task2 2022-03-01 2022-06-01 400
Task3 2019-11-01 2020-01-01 300
Task3 2021-11-01 2022-04-01 600
Expected output: I need to calculate the backlog from this month to until maximum date in end_date column. How I get how much pay for one months is: Total_salary/Months between start_date and end_date I need below output since this Jan/2022. I need this in separate datframe which have only below two columns.
date Total_backlog
2022-01-31 #(Task1: 500-100) (Task2: 300 ( because it didn't
#started yet)) (Task3: 0)( it's already finished))
#(Task4: 600 - 300)
#So total is : 400 400 0 300 = 1100
2022-02-28 800
2022-03-31 .....
.......
2022-06-31 .....
( This is the max date in end_date, but actual data set this date is more than that date)
I don't know how to loop over pyspark dataframe. Please can someone help me?
CodePudding user response:
Using this input dataframe:
df = spark.createDataFrame([
("Task1", "2022-01-01", "2022-04-01", 500),
("Task2", "2022-03-01", "2022-06-01", 400),
("Task3", "2019-11-01", "2020-01-01", 300),
("Task4", "2021-11-01", "2022-04-01", 600)
], ["Task", "start_date", "end_date", "Total_salary"])
First, generate dates_df using sequence function like this:
# you can repalace '2022-01-01' by current_date truncated to month unit
dates_df = df.selectExpr(
"sequence(date_trunc('mm', '2022-01-01'), date_trunc('mm', max(end_date)), interval 1 month) as dates"
).select(
F.explode("dates").alias("date")
).withColumn(
"date",
F.last_day(F.col("date"))
).crossJoin(df.select("Task").distinct())
Now, left join with original dataframe on date > end_date and aggregate to sum total salary, after you calculate the remaining salary with formula:
(Total_salary/nb_months_task) * nb_remaining_months_task
result = (dates_df.join(df, ["Task"], "left")
.filter(F.col("end_date") > F.col("date"))
.withColumn("salary_per_month",
F.round(F.col("Total_salary") / F.months_between("end_date", "start_date")))
.withColumn("Total_salary", F.when(F.col("start_date") < F.col("date"),
F.col("salary_per_month") * F.round(
F.months_between("end_date", "date"))
).otherwise(F.col("Total_salary")))
.groupBy("date")
.agg(F.sum("Total_salary").alias("Total_backlog"))
).orderBy("date")
result.show()
# ---------- -------------
#| date|Total_backlog|
# ---------- -------------
#|2022-01-31| 974.0|
#|2022-02-28| 687.0|
#|2022-03-31| 266.0|
#|2022-04-30| 133.0|
#|2022-05-31| 0.0|
#|2022-06-30| null|
# ---------- -------------
You can adapt this last part if the actual logic is not the same. But you got the idea.
