Let's say we have two tables - A and B, whose column keys are country, name, age. I want to create a union table from A and B, called result, such that in the result table will be all the unique pairs of country and name, with a preference to pair from A.
For example, assume this is table A:
val column = Seq("country","name", "age")
val A = Seq(("New York","Smith","10"),
("Washington","Rose","5"),
("Mexico","David","1"))
val aDF = A.toDF(column:_*)
aDF.show(false)
---------- ----- ---
|country |name |age|
---------- ----- ---
|New York |Smith|10 |
|Washington|Rose |5 |
|Mexico |David|1 |
---------- ----- ---
This is table B:
val B = Seq(("New York","Smith","5"),
("Florida","Smith","5"),
("Washington","Jef","5"),
("Russia","Boris","12"))
val bDF = B.toDF(column:_*)
bDF.show(false)
---------- ----- ---
|country |name |age|
---------- ----- ---
|New York |Smith|5 |. // Should not be included in the result table
|Florida |Smith|5 |
|Washington|Jef |5 |
|Russia |Boris|12 |
---------- ----- ---
And the result table will be -
---------- ----- ---
|country |name |age|
---------- ----- ---
|New York |Smith|10 | // <New York, Smith> contained in A and B - we take from A
|Washington|Rose |5 |. // Contained in A
|Mexico |David|1 |. // Contained in A
|Florida |Smith|5 |. // Contained in B
|Washington|Jef |5 |. // Contained in B
|Russia |Boris|12 |. // Contained in B
---------- ----- ---
How can I do that with spark?
CodePudding user response:
You can add a new column source with literal values 1 and 2 respectively in dataframes aDF and bDF before the union operation. Then, use this column for custom ordering to eliminate duplicate rows with row_number Window function:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("country", "name").orderBy("source")
val result = aDF.withColumn("source", lit(1))
.union(bDF.withColumn("source", lit(2)))
.withColumn("rn", row_number().over(w))
.filter("rn = 1")
.drop("rn", "source")
result.show
// ---------- ----- ---
//| country| name|age|
// ---------- ----- ---
//| Florida|Smith| 5|
//|Washington| Rose| 5|
//| Mexico|David| 1|
//|Washington| Jef| 5|
//| Russia|Boris| 12|
//| New York|Smith| 10|
// ---------- ----- ---
CodePudding user response:
Full join might be more efficient than union window:
aDF.join(bDF, Seq("country", "name"), "full")
.withColumn("age", coalesce(aDF("age"), bDF("age")))
.drop(aDF("age")).drop(bDF("age"))
.show
