Home > database >  How to union two tables and remove duplicates with conditions using Spark
How to union two tables and remove duplicates with conditions using Spark

Time:01-05

Let's say we have two tables - A and B, whose column keys are country, name, age. I want to create a union table from A and B, called result, such that in the result table will be all the unique pairs of country and name, with a preference to pair from A.

For example, assume this is table A:

val column = Seq("country","name", "age")
val A = Seq(("New York","Smith","10"),
      ("Washington","Rose","5"),
      ("Mexico","David","1"))
val aDF = A.toDF(column:_*)
aDF.show(false)

 ---------- ----- --- 
|country   |name |age|
 ---------- ----- --- 
|New York  |Smith|10 |
|Washington|Rose |5  |
|Mexico    |David|1  |
 ---------- ----- --- 

This is table B:

val B = Seq(("New York","Smith","5"),
    ("Florida","Smith","5"),
    ("Washington","Jef","5"),
    ("Russia","Boris","12"))

val bDF = B.toDF(column:_*)
bDF.show(false)

 ---------- ----- --- 
|country   |name |age|
 ---------- ----- --- 
|New York  |Smith|5  |.  // Should not be included in the result table
|Florida   |Smith|5  |
|Washington|Jef  |5  |
|Russia    |Boris|12 |
 ---------- ----- --- 

And the result table will be -

 ---------- ----- --- 
|country   |name |age|
 ---------- ----- --- 
|New York  |Smith|10 |  // <New York, Smith> contained in A and B - we take from A
|Washington|Rose |5  |. // Contained in A
|Mexico    |David|1  |. // Contained in A
|Florida   |Smith|5  |. // Contained in B
|Washington|Jef  |5  |. // Contained in B
|Russia    |Boris|12 |. // Contained in B
 ---------- ----- --- 

How can I do that with spark?

CodePudding user response:

You can add a new column source with literal values 1 and 2 respectively in dataframes aDF and bDF before the union operation. Then, use this column for custom ordering to eliminate duplicate rows with row_number Window function:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("country", "name").orderBy("source")

val result = aDF.withColumn("source", lit(1))
  .union(bDF.withColumn("source", lit(2)))
  .withColumn("rn", row_number().over(w))
  .filter("rn = 1")
  .drop("rn", "source")

result.show

// ---------- ----- --- 
//|   country| name|age|
// ---------- ----- --- 
//|   Florida|Smith|  5|
//|Washington| Rose|  5|
//|    Mexico|David|  1|
//|Washington|  Jef|  5|
//|    Russia|Boris| 12|
//|  New York|Smith| 10|
// ---------- ----- --- 

CodePudding user response:

Full join might be more efficient than union window:

aDF.join(bDF, Seq("country", "name"), "full")
.withColumn("age", coalesce(aDF("age"), bDF("age")))
.drop(aDF("age")).drop(bDF("age"))
.show
  •  Tags:  
  • Related