I have a NAV history CSV and I have to calculate the YTD and MTD performance of a Mutual Fund on a given date using the NAV.
My CSV looks like this.
MutualFundName NAV Date
A 2 2022-02-03
A 2.2 2022-02-02
A 2.1 2022-02-01
B 3 2022-02-03
B 2.9 2022-02-02
B 2.7 2022-02-01
C 6 2022-02-03
C 5.5 2022-02-02
C 5.9 2022-02-01
Where I have a corresponding NAV value for each Mutual Fund on a given date.
I have to calculate the YTD and MTD of each Mutual Fund on the current date.
YTD formula is
((NAV(end) - NAV(start)) / NAV(start)) * 100
where NAV(end) is the current date and NAV(start) is 1st January of that year.
Similarly, for MTD it will be and NAV(start) will be the 1st of the given month and year.
I have to write a pyspark job to achieve it. Currently, I have the CSV data in a DataFrame.
Expected O/P for MTD for 3rd Feb 2022 will be
MutualFundName MTD
A -4.761904762
B 11.11111111
C 1.694915254
CodePudding user response:
Filter where Date is equal to current_date or first day of current month, then group by MutualFundName and apply your formula:
from pyspark.sql import functions as F
result = df.filter(
"Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
((F.max(F.struct("Date", "NAV"))["NAV"] - F.min(F.struct("Date", "NAV"))["NAV"]) /
F.min(F.struct("Date", "NAV"))["NAV"] * 100
).alias("MTD")
)
result.show()
# -------------- ------------------
#|MutualFundName| MTD|
# -------------- ------------------
#| A|-4.761904761904765|
#| B|11.111111111111104|
#| C|1.6949152542372818|
# -------------- ------------------
For Spark 3 , you can use max_by and min_by function instead of struct ordering in the aggregation:
result = df.filter(
"Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
((F.expr("max_by(NAV, Date)") - F.expr("min_by(NAV, Date)")) /
F.expr("min_by(NAV, Date)") * 100
).alias("MTD")
)
