I've asked a bit similar question earlier today. Here it is. Shortly: I need to do record linkage for two large datasets (1.6M & 6M). I was going to use Sparks thinking that Cartesian product I was warned about would not be such a big problem. But it is. It hit the performance so hard that the linkage process didn't finish in 7 hours..
Is there another library/framework/tool for doing this more effectively? Or maybe improve performance of the solution below?
The code I ended up with:
object App {
def left(col: Column, n: Int) = {
assert(n > 0)
substring(col, 1, n)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[4]")
.appName("MatchingApp")
.getOrCreate()
import spark.implicits._
val a = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/a.csv")
.withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
val b = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/b.txt")
.withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
// @formatter:off
val condition = a
.col("FULL_NAME").contains(b.col("FIRST_NAME"))
.and(a.col("FULL_NAME").contains(b.col("LAST_NAME")))
.and(a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
.or(a.col("STREET").startsWith(left(b.col("STR"), 3))))
// @formatter:on
val startMillis = System.currentTimeMillis();
val res = a.join(b, condition, "left_outer")
val count = res
.filter(col("B_ID").isNotNull)
.count()
println(s"Count: $count")
val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
println(s"Execution time: ${executionTime.toMinutes}m")
}
}
Probably the condition is too complicated, but it must be that way.
CodePudding user response:
You may improve performance of your current solution by changing a bit the logic of how your perform your linkage:
- First perform an inner join of
aandbdataframes with columns that you know matches. In your case, it seems to beLAST_NAMEandFIRST_NAMEcolumns. - Then filter the resulting dataframe with your specific complex conditions, In your case, birth dates are equal or street matches condition.
- Finally, if you need to also keep the not linked records, perform a right join with the
adataframe.
Your code could be rewritten as follow:
import org.apache.spark.sql.functions.{col, substring, to_date}
import org.apache.spark.sql.SparkSession
import java.time.Duration
object App {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[4]")
.appName("MatchingApp")
.getOrCreate()
val a = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/a.csv")
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
val b = spark.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.load("/home/helveticau/workstuff/b.txt")
.withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
val condition = a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
.or(a.col("STREET").startsWith(substring(b.col("STR"), 1, 3)))
val startMillis = System.currentTimeMillis();
val res = a.join(b, Seq("LAST_NAME", "FIRST_NAME"))
.filter(condition)
// two following lines optional if you want to only keep records with not null B_ID
.select("B_ID", "A_ID")
.join(a, Seq("A_ID"), "right_outer")
val count = res
.filter(col("B_ID").isNotNull)
.count()
println(s"Count: $count")
val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
println(s"Execution time: ${executionTime.toMinutes}m")
}
}
So you will avoid cartesian product at the price of two joins instead of only one.
Example
With file a.csv containing the following data:
"A_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STREET"
10;John;Doe;1965-10-21;Johnson Road
11;Rebecca;Davis;1977-02-27;Lincoln Road
12;Samantha;Johns;1954-03-31;Main Street
13;Roger;Penrose;1987-12-25;Oxford Street
14;Robert;Smith;1981-08-26;Canergie Road
15;Britney;Stark;1983-09-27;Alshire Road
And b.txt having the following data:
"B_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STR"
29;John;Doe;21.10.1965;Johnson Road
28;Rebecca;Davis;28.03.1986;Lincoln Road
27;Shirley;Iron;30.01.1956;Oak Street
26;Roger;Penrose;25.12.1987;York Street
25;Robert;Dayton;26.08.1956;Canergie Road
24;Britney;Stark;22.06.1962;Algon Road
res dataframe will be:
---- ---- ---------- --------- ---------- -------------
|A_ID|B_ID|FIRST_NAME|LAST_NAME|BIRTH_DATE|STREET |
---- ---- ---------- --------- ---------- -------------
|10 |29 |John |Doe |1965-10-21|Johnson Road |
|11 |28 |Rebecca |Davis |1977-02-27|Lincoln Road |
|12 |null|Samantha |Johns |1954-03-31|Main Street |
|13 |26 |Roger |Penrose |1987-12-25|Oxford Street|
|14 |null|Robert |Smith |1981-08-26|Canergie Road|
|15 |null|Britney |Stark |1983-09-27|Alshire Road |
---- ---- ---------- --------- ---------- -------------
Note: if your
FIRST_NAMEandLAST_NAMEcolumns are not exactly the same, you can try to make them matches with Spark's built-in functions, for instance:
trimto remove spaces at start and end of stringlowerto transform the column to lower case (and thus ignore case in comparison)What is really important is to have the maximum number of columns that exactly match.
