I need to build a select query to fill nulls in a dataframe with the content of other dataframe
df1.
If in df1 is null, it needs to check in df2 the value with df2.date = df1.prev_date
My code now is
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
q="select NVL(b.value, select a.value from table2 a where a.date=b.prev_date_def) as value from table1 b"
df_out = spark.sql(q)
But this code is failing because needs an aggregated function.
Error:
pyspark.sql.utils.AnalysisException: Correlated scalar subqueries must be aggregated: Filter (cast(date#269 as date) = outer(prev_date#1500))...
df1
------------ ------- ------------
| date | value| prev_date |
------------ ------- ------------
1 | 23/11/2021 | 0.141| 24/11/2021 |
2 | 24/11/2021 | 0.17| 23/11/2021 |
3 | 25/11/2021 | null| 24/11/2021 |
4 | 26/11/2021 | 0.135| 25/11/2021 |
df2
------------ -------
| date | value|
------------ -------
1 | 23/11/2021 | 1.00|
2 | 24/11/2021 | 2.00|
3 | 25/11/2021 | 3.00|
4 | 26/11/2021 | 4.00|
expected df_out
------------ ------- ------------
| date | value| prev_date |
------------ ------- ------------
1 | 23/11/2021 | 0.141| 24/11/2021 |
2 | 24/11/2021 | 0.17| 23/11/2021 |
3 | 25/11/2021 | 2.00| 24/11/2021 |
4 | 26/11/2021 | 0.135| 25/11/2021 |
CodePudding user response:
Left join coalesce should do the job:
result = df1.join(df2, df1["prev_date"] == df2["date"], "left").select(
df1["date"],
F.coalesce(df1["value"], df2["value"]).alias("value"),
df1["prev_date"]
)
result.show()
# ---------- ----- ----------
#| date|value| prev_date|
# ---------- ----- ----------
#|24/11/2021| 0.17|23/11/2021|
#|23/11/2021|0.141|24/11/2021|
#|25/11/2021| 2.0|24/11/2021|
#|26/11/2021|0.135|25/11/2021|
# ---------- ----- ----------
Equivalent SparkSQL query:
result = spark.sql("""
SELECT t1.date,
coalesce(t1.value, t2.value) as value,
t1.prev_date
FROM table1 t1
LEFT JOIN table2 t2
ON t1.prev_date = t2.date
""")
CodePudding user response:
from pyspark.sql.functions import *
w = Window().partitionBy().orderBy(col("date"))
new=(df1.join(df2.withColumnRenamed('value','value_2'), on=['date'], how='left')#Join the two df's renaming values in df2
.withColumn("value", coalesce(col('value'),lag("value_2").over(w))).drop('value_2')#Fill the col value with a shifted value of value_2
)
----------- ----- ----------
| date|value| prev_date|
----------- ----- ----------
|223/11/2021|0.141|24/11/2021|
| 24/11/2021| 0.17|23/11/2021|
| 25/11/2021| 2.0|24/11/2021|
| 26/11/2021|0.135|25/11/2021|
----------- ----- ----------
