Home > Software engineering >  How to calculate future values based on current value in a column in pyspark dataframe?
How to calculate future values based on current value in a column in pyspark dataframe?

Time:02-03

my question is as below. I'm trying to calculate future value, let's say backlog value in pyspark dataframe.

My smaple data frame is:

 Task     start_date      end_date  Total_salary   
Task1     2022-01-01    01-04-2022           500                
Task2     2022-03-01    2022-06-01           400                                
Task3     2019-11-01    2020-01-01           300   
Task3     2021-11-01    2022-04-01           600                       

Expected output: I need to calculate the backlog from this month to until maximum date in end_date column. How I get how much pay for one months is: Total_salary/Months between start_date and end_date I need below output since this Jan/2022. I need this in separate datframe which have only below two columns.

date              Total_backlog
2022-01-31        #(Task1: 500-100)    (Task2: 300 ( because it didn't 
                  #started yet))   (Task3: 0)( it's already finished))   
                  #(Task4: 600 - 300)  
                  #So total is : 400   400  0   300 = 1100

2022-02-28        800

2022-03-31        .....

.......
2022-06-31        .....

( This is the max date in end_date, but actual data set this date is more than that date)

I don't know how to loop over pyspark dataframe. Please can someone help me?

CodePudding user response:

Using this input dataframe:

df = spark.createDataFrame([
    ("Task1", "2022-01-01", "2022-04-01", 500),
    ("Task2", "2022-03-01", "2022-06-01", 400),
    ("Task3", "2019-11-01", "2020-01-01", 300),
    ("Task4", "2021-11-01", "2022-04-01", 600)
], ["Task", "start_date", "end_date", "Total_salary"])

First, generate dates_df using sequence function like this:

# you can repalace '2022-01-01' by current_date truncated to month unit
dates_df = df.selectExpr(
    "sequence(date_trunc('mm', '2022-01-01'), date_trunc('mm', max(end_date)), interval 1 month) as dates"
).select(
    F.explode("dates").alias("date")
).withColumn(
    "date",
    F.last_day(F.col("date"))
).crossJoin(df.select("Task").distinct())

Now, left join with original dataframe on date > end_date and aggregate to sum total salary, after you calculate the remaining salary with formula:

(Total_salary/nb_months_task) * nb_remaining_months_task

result = (dates_df.join(df, ["Task"], "left")
          .filter(F.col("end_date") > F.col("date"))
          .withColumn("salary_per_month",
                      F.round(F.col("Total_salary") / F.months_between("end_date", "start_date")))
          .withColumn("Total_salary", F.when(F.col("start_date") < F.col("date"),
                                             F.col("salary_per_month") * F.round(
                                                 F.months_between("end_date", "date"))
                                             ).otherwise(F.col("Total_salary")))
          .groupBy("date")
          .agg(F.sum("Total_salary").alias("Total_backlog"))
          ).orderBy("date")

result.show()
# ---------- ------------- 
#|      date|Total_backlog|
# ---------- ------------- 
#|2022-01-31|        974.0|
#|2022-02-28|        687.0|
#|2022-03-31|        266.0|
#|2022-04-30|        133.0|
#|2022-05-31|          0.0|
#|2022-06-30|         null|
# ---------- ------------- 

You can adapt this last part if the actual logic is not the same. But you got the idea.

  •  Tags:  
  • Related