I'm trying to create a function, which takes in a column name and returns col after processing the data.
One of the functionalities I'm stuck at is as below
If / is present in the input
- Split the given input on
/ - Select first element and last element after split
- If length of first element is
3or10then process, else make col value to null - If length of last element is
7or10then process, else make col value to null
If / is not present in the input
- Take the first 10 chars from the input
Below is my function. Any direct processing on df can also help?
def phone_number_processor(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
...
# if input has '/', then
# ?????
# if input doesn't have '/' then
col = F.substring(col, 1, 10) # get first 10 chars
...
return col
sample output:
df.withColumn('PROCESSED_PHONE', phone_number_processor('PHONE')).show()
---------------- ---------------
| PHONE|PROCESSED_PHONE|
---------------- ---------------
| 1234567890| 1234567890| #-> as is
|123/2345/1234567| 1231234567| #-> first and last elements after split with '/'
| 123/1234567| 1231234567| #-> same as above
| 123/12345| null| #-> since length last element after split is != 7
| 1234/1234567| null| #-> since length first element after split is != 3
---------------- ---------------
PS. I've tried to use spark functions - contains, split however I'm not able to do what I want. I've been working on this for quite sometime, any inputs/suggestions are appreciated as well.
CodePudding user response:
There is no need to define UDF function when you can actually do the same using only Spark builtin functions. Simply split the column PHONE then using some when expressions on first and last elements of the resulting array get the desired output like this:
from pyspark.sql import functions as F
df = spark.createDataFrame([("1234567890",), ("123/2345/1234567",), ("123/1234567",), ("123/12345",), ("1234/1234567",)], ["PHONE"])
df1 = df.withColumn("split", F.split("PHONE", "/")) \
.withColumn("first_part", F.element_at("split", 1)) \
.withColumn("last_part", F.element_at("split", -1)) \
.withColumn(
"PROCESSED_PHONE",
F.when(
F.size("split") == 1,
F.substring("first_part", 0, 10)
).otherwise(
F.concat(
F.when(F.length("first_part") == 3, F.col("first_part")),
F.when(F.length("last_part") == 7, F.col("last_part"))
)
)
).drop("first_part", "last_part", "split")
df1.show()
# ---------------- ---------------
#| PHONE|PROCESSED_PHONE|
# ---------------- ---------------
#| 1234567890| 1234567890|
#|123/2345/1234567| 1231234567|
#| 123/1234567| 1231234567|
#| 123/12345| null|
#| 1234/1234567| null|
# ---------------- ---------------
CodePudding user response:
Based on @blackbishop's answer, created the function if you are interested
def phone_number_validator(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
split_col = F.split(col, '/')
first_part= F.element_at(split_col, 1)
last_part = F.element_at(split_col, -1)
col = F.when(
F.size(split_col) == 1 & F.length(col) >= 10,
F.substring(col, 1, 10) # get first 10 chars
).otherwise(
F.concat(
F.when(F.length(first_part) == 3, first_part),
F.when(F.length(last_part ) == 7, last_part ),
) # concatenate any string with a NULL, will result in NULL
)
return col
Usage:
df = df.withColumn('PROCESSED_PHONE', phone_number_validator('PHONE'))
