I have a SQL query which I am trying to transform into PySpark which have some joins and multiple where conditions:
UPDATE COMPANY1
INNER JOIN COMPANY2
ON COMPANY1.C1_PROFIT = COMPANY2.C2_PROFIT
SET COMPANY2.C2_TARGET = "1"
WHERE (((COMPANY2.C2_TARGET) Is Null)
AND ((COMPANY1.C1_SALES) Is Null)
AND ((COMPANY2.C2_PROFIT) Is Not Null));
PySpark query I am trying to execute (df_1->COMPANY2 & df_2->Company1):
join = ((df_1.C2_PROFIT == df_2.C1_PROFIT) & \
(df_1.C2_TARGET=='') & \
(df_2.C1_SALES=='') & \
(df_1.C2_PROFIT!=''))
df_1 = (df_1.alias('a')
.join(df_2.alias('b'), join, 'left')
.select(
*[c for c in df_1.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
)
)
But I am still getting null values in column "C2_TARGET".
For the information: column "C1_Profit" is null-free, but in "C2_Profit" we sometimes have null as well as values.
Example inputs:
------------------ --------------
| C1_PROFIT |C1_SALES |
------------------ --------------
|5637 | Positive |
|7464 | |
|43645 | |
|64657 | Growth P|
------------------ --------------
------------------ --------------
| C2_PROFIT |C2_TARGET |
------------------ --------------
| | |
|7464 | |
|43645 | |
|64657 | |
------------------ --------------
Expected result:
CodePudding user response:
based on the query, it seems like a case when operation that assigns the value "1" wherever the conditions are met. this can be replicated using a when().otherwise().
assuming company1 and company2 are the 2 dataframes
final_sdf = company1. \
join(company2, company1.c1_profit = company2.c2_profit, 'inner'). \
withColumn('c2_target',
func.when(company2.c2_target.isNull() &
company1.c1_sales.isNull() &
company2.c2_profit.isNotNull(), func.lit("1")
).
otherwise(company2.c2_target)
)
CodePudding user response:
In this answer, you have an example of how to do
UPDATE A INNER JOIN B
...
SET A...
In your case, you SET B...:
UPDATE A INNER JOIN B
...
SET B...
You have correctly switched the order of your dataframes.
What's not correct is that '' is not the same as null. You must use .isNull() and .isNotNull() in your conditions.
Example inputs:
from pyspark.sql import functions as F
df_1 = spark.createDataFrame(
[(5637, 'Positive'),
(7464, None),
(43645, None),
(64657, 'Growth P')],
['C1_PROFIT', 'C1_SALES'])
df_2 = spark.createDataFrame(
[(None, None),
(7464, None),
(43645, None),
(64657, None)],
'C2_PROFIT int, C2_TARGET string')
Script:
join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \
df_2.C2_TARGET.isNull() & \
df_1.C1_SALES.isNull() & \
df_2.C2_PROFIT.isNotNull()
df = (df_2.alias('a')
.join(df_1.alias('b'), join_on, 'left')
.select(
*[c for c in df_2.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
)
)
df.show()
# --------- ---------
# |C2_PROFIT|C2_TARGET|
# --------- ---------
# | null| null|
# | 7464| 1|
# | 64657| null|
# | 43645| 1|
# --------- ---------

