Home > Net >  How filter pyspark dataframe when multiple dynamic criteria needs to be applied
How filter pyspark dataframe when multiple dynamic criteria needs to be applied

Time:01-27

I want to filter the dataframe df based of the criteria in DfCriteria dataframe

df = spark.createDataFrame([
Row(ID ="A01", Date=date(2021, 1, 1), SalesUSD=324),
Row(ID ="A01", Date=date(2021, 1, 2), SalesUSD=567),
Row(ID ="A01", Date=date(2021, 1, 3), SalesUSD=645),
Row(ID ="A01", Date=date(2021, 1, 4), SalesUSD=834),
Row(ID ="A02", Date=date(2021, 1, 1), SalesUSD=284),
Row(ID ="A02", Date=date(2021, 1, 2), SalesUSD=453),
Row(ID ="A02", Date=date(2021, 1, 3), SalesUSD=132),
Row(ID ="A04", Date=date(2021, 1, 4), SalesUSD=254)
]) 
|ID |  Date      |  SalesUSD |
|---|------------|-----------|
|A01|  2021-01-01|     324   |
|A01|  2021-01-02|     567   |
|A01|  2021-01-03|     645   |
|A01|  2021-01-04|     834   |
|A02|  2021-01-01|     284   |
|A02|  2021-01-02|     453   |
|A02|  2021-01-03|     132   |
|A04|  2021-01-04|     254   |

DF Criteria:

DfCriteria = spark.createDataFrame([
Row(ID ="A01", StartDate=date(2021, 1, 1), EndDate=date(2021, 1, 2)),
Row(ID ="A02", StartDate=date(2021, 1, 2), EndDate=date(2021, 1, 4))])
|ID |StartDate  | EndDate    |
|---|-----------|------------|
|A01| 2021-01-01| 2021-01-02 |
|A02| 2021-01-02| 2021-01-04 |

Expected Output

|ID |  Date      |  SalesUSD |
|---|------------|-----------|
|A01|  2021-01-01|     324   |
|A01|  2021-01-02|     567   |
|A02|  2021-01-02|     453   |
|A02|  2021-01-03|     132   |
|A04|  2021-01-04|     254   |

CodePudding user response:

Assuming that you want to keep only events that are between start and end date:

df.join(DfCriteria, on='ID', how='left').where(F.col('Date').between('StartDate', 'EndDate'))

CodePudding user response:

You can use left_semi join:

from pyspark.sql import functions as F

df = spark.createDataFrame([
    ("A01", "2021-01-01", 324), ("A01", "2021-01-02", 567), ("A01", "2021-01-03", 645),
    ("A01", "2021-01-04", 834), ("A02", "2021-01-01", 284), ("A02", "2021-01-02", 453),
    ("A02", "2021-01-03", 132), ("A04", "2021-01-04", 254)], ["ID", "Date", "SalesUSD"])

DfCriteria = spark.createDataFrame([
    ("A01", "2021-01-01", "2021-01-02"), ("A02", "2021-01-02", "2021-01-04")
], ["ID", "StartDate", "EndDate"])

result = df.join(
    DfCriteria,
    (df["ID"] == DfCriteria["ID"]) & F.col("Date").between(F.col("StartDate"), F.col("EndDate")),
    'left_semi'
)

result.show()

#  --- ---------- -------- 
# | ID|      Date|SalesUSD|
#  --- ---------- -------- 
# |A01|2021-01-01|     324|
# |A01|2021-01-02|     567|
# |A02|2021-01-02|     453|
# |A02|2021-01-03|     132|
#  --- ---------- -------- 
  •  Tags:  
  • Related