I have the following code
from pyspark.sql.functions import col, count, when
from functools import reduce
df = spark.createDataFrame([ (1,""), (2,None),(3,"c"),(4,"d") ], ['id','name'])
filter1 = col("name").isNull()
filter2 = col("name") == ""
dfresult = df.filter(filter1 | filter2).select(col("id"), when(filter1, "name is null").when(filter2, "name is empty").alias("new_col"))
dfresult.show()
--- -------------
| id| new_col|
--- -------------
| 1|name is empty|
| 2| name is null|
--- -------------
In the scenario with N filters. I think about
filters = []
filters.append({ "item": filter1, "msg":"name is null"})
filters.append({ "item": filter2, "msg":"name is empty"})
dynamic_filter = reduce(
lambda x,y: x | y,
[s['item'] for s in filters]
)
df2 = df.filter(dynamic_filter).select(col("id"), when(filter1, "name is null").when(filter2, "name is empty").alias("new_col"))
df2.show()
How can I make something better for new_col column with dynamic when?
CodePudding user response:
Simply use functools.reduce as your already did for the filter expression:
from functools import reduce
from pyspark.sql import functions as F
new_col = reduce(
lambda acc, x: acc.when(x["item"], F.lit(x["msg"])),
filters,
F
)
df2 = df.filter(dynamic_filter).select(col("id"), new_col.alias("new_col"))
df2.show()
# --- -------------
#| id| new_col|
# --- -------------
#| 1|name is empty|
#| 2| name is null|
# --- -------------
