Home > Net >  Spark join two dataframes on best .startswith match
Spark join two dataframes on best .startswith match

Time:02-01

Two dataframes with phone numbers with ids that could only be matched via regex, e.g.

idPrefix other columns
420
42055
420551
phoneNumber other columns
420551666
421709560

I would need to join these dataframes on the best match of idPrefix to the phoneNumber, matching the longest starting prefix possible, if there is one. E.g. if there were any option to join on longest idPrefix for phoneNumber.startswith(idPrefix), that would be great.

e.g. result would look like enter image description here

I tried a few UDFs with regex instead of a df.join(), but there does not seem to be a way to have a dataframe as an input to UDF.

CodePudding user response:

Join on startsWith condition then use row_number to keep longest idPrefix per phoneNumber:

import org.apache.spark.sql.expressions.Window

val df1 = Seq(("420"), ("42055"), ("420551")).toDF("idPrefix")
val df2 = Seq(("420551666"), ("421709560")).toDF("phoneNumber")

val df = (df2
  .join(df1, col("phoneNumber").startsWith(col("idPrefix")), "left")
  .withColumn(
    "rn",
    row_number().over(
      Window.partitionBy("phoneNumber").orderBy(length(col("idPrefix")).desc)
    )
  )
  .filter("rn = 1")
  .drop("rn")
)

df.show
// ----------- -------- 
//|phoneNumber|idPrefix|
// ----------- -------- 
//|  421709560|    null|
//|  420551666|  420551|
// ----------- -------- 

CodePudding user response:

It's true that you can use directly the startsWith in the join, but this will require that one of the tables can be broadcasted, if this case can't be achieved it will become a join with a cartesian product complexity making it very inefficient. If both dataframes are very large, you can try to make a join first by prefixes and then filter by the startsWith.

The code will look something like this:

val df1 = Seq(("420"), ("42055"), ("420551")).toDF("idPrefix")
val df2 = Seq(("420551666"), ("421709560")).toDF("phoneNumber")

val prefixSize = df1.select(min(length(col("idPrefix")))).as[Int].head()
//takes the length of the smallest prefix, or if its static set it hardcoded

val df = df2
  .join(df1, substring(col("idPrefix"), 0, prefixSize) === substring(col("phoneNumber"), 0, prefixSize), "left")
  .filter(col("phoneNumber").startsWith(col("idPrefix")) || col("idPrefix").isNull) //join by the prefix and filter all false elements
  .groupBy("phoneNumber").agg(max(struct(length(col("idPrefix").as("size")), col("idPrefix"))).as("idPrefix"))
  .withColumn("idPrefix", col("idPrefix.idPrefix")) //take the maximum length for the phone number in a single aggregation

df.show
// ----------- -------- 
//|phoneNumber|idPrefix|
// ----------- -------- 
//|  420551666|  420551|
//|  421709560|    null|
// ----------- -------- 
  •  Tags:  
  • Related