Two dataframes with phone numbers with ids that could only be matched via regex, e.g.
| idPrefix | other columns |
|---|---|
| 420 | |
| 42055 | |
| 420551 |
| phoneNumber | other columns |
|---|---|
| 420551666 | |
| 421709560 |
I would need to join these dataframes on the best match of idPrefix to the phoneNumber, matching the longest starting prefix possible, if there is one. E.g. if there were any option to join on longest idPrefix for phoneNumber.startswith(idPrefix), that would be great.
I tried a few UDFs with regex instead of a df.join(), but there does not seem to be a way to have a dataframe as an input to UDF.
CodePudding user response:
Join on startsWith condition then use row_number to keep longest idPrefix per phoneNumber:
import org.apache.spark.sql.expressions.Window
val df1 = Seq(("420"), ("42055"), ("420551")).toDF("idPrefix")
val df2 = Seq(("420551666"), ("421709560")).toDF("phoneNumber")
val df = (df2
.join(df1, col("phoneNumber").startsWith(col("idPrefix")), "left")
.withColumn(
"rn",
row_number().over(
Window.partitionBy("phoneNumber").orderBy(length(col("idPrefix")).desc)
)
)
.filter("rn = 1")
.drop("rn")
)
df.show
// ----------- --------
//|phoneNumber|idPrefix|
// ----------- --------
//| 421709560| null|
//| 420551666| 420551|
// ----------- --------
CodePudding user response:
It's true that you can use directly the startsWith in the join, but this will require that one of the tables can be broadcasted, if this case can't be achieved it will become a join with a cartesian product complexity making it very inefficient. If both dataframes are very large, you can try to make a join first by prefixes and then filter by the startsWith.
The code will look something like this:
val df1 = Seq(("420"), ("42055"), ("420551")).toDF("idPrefix")
val df2 = Seq(("420551666"), ("421709560")).toDF("phoneNumber")
val prefixSize = df1.select(min(length(col("idPrefix")))).as[Int].head()
//takes the length of the smallest prefix, or if its static set it hardcoded
val df = df2
.join(df1, substring(col("idPrefix"), 0, prefixSize) === substring(col("phoneNumber"), 0, prefixSize), "left")
.filter(col("phoneNumber").startsWith(col("idPrefix")) || col("idPrefix").isNull) //join by the prefix and filter all false elements
.groupBy("phoneNumber").agg(max(struct(length(col("idPrefix").as("size")), col("idPrefix"))).as("idPrefix"))
.withColumn("idPrefix", col("idPrefix.idPrefix")) //take the maximum length for the phone number in a single aggregation
df.show
// ----------- --------
//|phoneNumber|idPrefix|
// ----------- --------
//| 420551666| 420551|
//| 421709560| null|
// ----------- --------

