Home > Blockchain >  pyspark sql select from other table when is null with condition
pyspark sql select from other table when is null with condition

Time:02-01

I need to build a select query to fill nulls in a dataframe with the content of other dataframe df1.
If in df1 is null, it needs to check in df2 the value with df2.date = df1.prev_date

My code now is

df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
q="select NVL(b.value, select a.value from table2 a where a.date=b.prev_date_def) as value from table1 b"
df_out = spark.sql(q)

But this code is failing because needs an aggregated function.
Error:

pyspark.sql.utils.AnalysisException: Correlated scalar subqueries must be aggregated: Filter (cast(date#269 as date) = outer(prev_date#1500))...

df1

    ------------ ------- ------------ 
   |    date    |  value|  prev_date |
    ------------ ------- ------------ 
 1 | 23/11/2021 |  0.141| 24/11/2021 |
 2 | 24/11/2021 |   0.17| 23/11/2021 |
 3 | 25/11/2021 |   null| 24/11/2021 |
 4 | 26/11/2021 |  0.135| 25/11/2021 |

df2

    ------------ ------- 
   |    date    |  value|
    ------------ ------- 
 1 | 23/11/2021 |   1.00|
 2 | 24/11/2021 |   2.00|
 3 | 25/11/2021 |   3.00|
 4 | 26/11/2021 |   4.00|

expected df_out

     ------------ ------- ------------ 
    |    date    |  value|  prev_date |
     ------------ ------- ------------ 
  1 | 23/11/2021 |  0.141| 24/11/2021 |
  2 | 24/11/2021 |   0.17| 23/11/2021 |
  3 | 25/11/2021 |   2.00| 24/11/2021 |
  4 | 26/11/2021 |  0.135| 25/11/2021 |

CodePudding user response:

Left join coalesce should do the job:

result = df1.join(df2, df1["prev_date"] == df2["date"], "left").select(
    df1["date"],
    F.coalesce(df1["value"], df2["value"]).alias("value"),
    df1["prev_date"]
)

result.show()

# ---------- ----- ---------- 
#|      date|value| prev_date|
# ---------- ----- ---------- 
#|24/11/2021| 0.17|23/11/2021|
#|23/11/2021|0.141|24/11/2021|
#|25/11/2021|  2.0|24/11/2021|
#|26/11/2021|0.135|25/11/2021|
# ---------- ----- ---------- 

Equivalent SparkSQL query:

result = spark.sql("""
SELECT t1.date,
       coalesce(t1.value, t2.value) as value,
       t1.prev_date
FROM   table1 t1
LEFT JOIN table2 t2
ON    t1.prev_date = t2.date
""")

CodePudding user response:

from pyspark.sql.functions import *
w = Window().partitionBy().orderBy(col("date"))
new=(df1.join(df2.withColumnRenamed('value','value_2'), on=['date'], how='left')#Join the two df's renaming values in df2
        .withColumn("value", coalesce(col('value'),lag("value_2").over(w))).drop('value_2')#Fill the col value with a shifted value of value_2
    )

 ----------- ----- ---------- 
|       date|value| prev_date|
 ----------- ----- ---------- 
|223/11/2021|0.141|24/11/2021|
| 24/11/2021| 0.17|23/11/2021|
| 25/11/2021|  2.0|24/11/2021|
| 26/11/2021|0.135|25/11/2021|
 ----------- ----- ---------- 
  •  Tags:  
  • Related