Home > Blockchain >  PySpark replace multiple words in string column based on values in array column
PySpark replace multiple words in string column based on values in array column

Time:01-22

I have a dataframe that contains a string column with text of varied lengths, then I have an array column where each element is a struct with specified word, index, start position and end position in the text column. I want to replace words in the text column, that is in the array.

It looks like this:

- id:integer
- text:string
- text_entity:array
  - element:struct
    - word:string
    - index:integer
    - start:integer
    - end:integer

text example could be:

"I talked with Christian today at Cafe Heimdal last Wednesday"

text_entity example could be:

[{"word": "Christian", "index":4, "start":14, "end":23}, {"word": "Heimdal", "index":8, "start":38, "end":45}]

I then want to change the text to have the words at the above indexes replaced to:

"I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday"

My initial approach was to explode the array and then do a regex_replace, but then there is the problem of collecting the text and merging them. And it seems like it would take a lot of operations. And I would like to not use UDFs, as performance is quite important. regex_replace also has the problem that it might match sub-strings, and that would not be okay. Therefore ideally the index, start, or end is used.

CodePudding user response:

Use aggregate function on text_entity array with splitted text column as the initial value like this:

from pyspark.sql import functions as F

jsonSting = """{"id":1,"text":"I talked with Christian today at Cafe Heimdal last Wednesday","text_entity":[{"word":"Christian","index":4,"start":14,"end":23},{"word":"Heimdal","index":8,"start":38,"end":45}]}"""
df = spark.read.json(spark.sparkContext.parallelize([jsonSting]))

df1 = df.withColumn(
    "text",
    F.array_join(
        F.expr(r"""aggregate(
                  text_entity, 
                  split(text, " "), 
                  (acc, x) -> transform(acc, (y, i) -> IF(i=x.index, '(BLEEP)', y))
           )"""),
        " "
    )
)

df1.show(truncate=False)
# --- ---------------------------------------------------------- ---------------------------------------------- 
#|id |text                                                      |text_entity                                   |
# --- ---------------------------------------------------------- ---------------------------------------------- 
#|1  |I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday|[{23, 4, 14, Christian}, {45, 8, 38, Heimdal}]|
# --- ---------------------------------------------------------- ---------------------------------------------- 

CodePudding user response:

I came up with this answer using regexp_replace, problem with using regex replace however is that it will replace all occurrences, which is not the intention as a word could appear multiple time in the text, and only some of the occurrences should be 'bleeped'

  df = df.withColumn("temp_entities", F.expr(f"transform(text_entity, (x, i) -> x.word)")) \
         .withColumn("temp_entities", F.array_distinct("temp_entities")) \
         .withColumn("regex_expression", F.concat_ws("|", "temp_entities")) \
         .withColumn("regex_expression", F.concat(F.lit("\\b("), F.col("regex_expression"), F.lit(")\\b"))) \
         .withColumn("text", F.when(F.size("text_entity") > 0, F.expr("regexp_replace(text, regex_expression, '(BLEEP)')")).otherwise(F.col(text)))

It removes duplicates, and only applies regexp_replace if there are atleast 1 entity. Probably not the most elegant solution, and will 'bleep' all occurrences of the word. Ideally the position should be used.

  •  Tags:  
  • Related