Let's say I have a dataset with the following:
# dataset_left
# ----------------- -------------- ---------------
#| A | B | C |
# ----------------- -------------- ---------------
#| some_value_1 | some_value_3 | some_value_5 |
# ----------------- -------------- ---------------
#| some_value_2 | some_value_4 | some_value_6 |
# ----------------- -------------- ---------------
I also have another dataset like the following:
# dataset_rules
# ----------------- -------------- ---------------
#| A | B | result_col |
# ----------------- -------------- ---------------
#| null | some_value_3 | result_1 |
# ----------------- -------------- ---------------
#| some_value_2 | null | result_2 |
# ----------------- -------------- ---------------
My goal is to join the two datasets with this rule:
For the dataset_rules : null values in column A & column B can match any value from the dataset_left.
The join should only take into account non-null values from the dataset_rules.
So for the 1st row in dataset_rule, only column B should be used as a condition. And for the 2nd row, only column A should be used as a condition.
I want to achieve the following desired result :
# dataset_result
# ----------------- -------------- --------------- ------------
#| A | B | C | result_col |
# ----------------- -------------- --------------- ------------
#| some_value_1 | some_value_3 | some_value_5 | result_1 |
# ----------------- -------------- --------------- ------------
#| some_value_2 | some_value_4 | some_value_6 | result_2 |
# ----------------- -------------- --------------- ------------
The goal is to avoid hard coding the rules in dataset_rules to make it easy to add new rules and more maintainable.
CodePudding user response:
You can join using when or coalesce expression like this:
from pyspark.sql import functions as F
join_cond = (
(F.coalesce(dataset_rules["A"], dataset_left["A"]) == dataset_left["A"])
& (F.coalesce(dataset_rules["B"], dataset_left["B"]) == dataset_left["B"])
)
result = dataset_left.join(dataset_rules, join_cond, "left").select(
dataset_left["*"],
dataset_rules["result_col"]
)
result.show()
# ------------ ------------ ------------ ----------
#| A| B| C|result_col|
# ------------ ------------ ------------ ----------
#|some_value_1|some_value_3|some_value_5| result_1|
#|some_value_2|some_value_4|some_value_6| result_2|
# ------------ ------------ ------------ ----------
