I have a dataframe that contains a string column with text of varied lengths, then I have an array column where each element is a struct with specified word, index, start position and end position in the text column. I want to replace words in the text column, that is in the array.
It looks like this:
- id:integer
- text:string
- text_entity:array
- element:struct
- word:string
- index:integer
- start:integer
- end:integer
text example could be:
"I talked with Christian today at Cafe Heimdal last Wednesday"
text_entity example could be:
[{"word": "Christian", "index":4, "start":14, "end":23}, {"word": "Heimdal", "index":8, "start":38, "end":45}]
I then want to change the text to have the words at the above indexes replaced to:
"I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday"
My initial approach was to explode the array and then do a regex_replace, but then there is the problem of collecting the text and merging them. And it seems like it would take a lot of operations. And I would like to not use UDFs, as performance is quite important. regex_replace also has the problem that it might match sub-strings, and that would not be okay. Therefore ideally the index, start, or end is used.
CodePudding user response:
Use aggregate function on text_entity array with splitted text column as the initial value like this:
from pyspark.sql import functions as F
jsonSting = """{"id":1,"text":"I talked with Christian today at Cafe Heimdal last Wednesday","text_entity":[{"word":"Christian","index":4,"start":14,"end":23},{"word":"Heimdal","index":8,"start":38,"end":45}]}"""
df = spark.read.json(spark.sparkContext.parallelize([jsonSting]))
df1 = df.withColumn(
"text",
F.array_join(
F.expr(r"""aggregate(
text_entity,
split(text, " "),
(acc, x) -> transform(acc, (y, i) -> IF(i=x.index, '(BLEEP)', y))
)"""),
" "
)
)
df1.show(truncate=False)
# --- ---------------------------------------------------------- ----------------------------------------------
#|id |text |text_entity |
# --- ---------------------------------------------------------- ----------------------------------------------
#|1 |I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday|[{23, 4, 14, Christian}, {45, 8, 38, Heimdal}]|
# --- ---------------------------------------------------------- ----------------------------------------------
CodePudding user response:
I came up with this answer using regexp_replace, problem with using regex replace however is that it will replace all occurrences, which is not the intention as a word could appear multiple time in the text, and only some of the occurrences should be 'bleeped'
df = df.withColumn("temp_entities", F.expr(f"transform(text_entity, (x, i) -> x.word)")) \
.withColumn("temp_entities", F.array_distinct("temp_entities")) \
.withColumn("regex_expression", F.concat_ws("|", "temp_entities")) \
.withColumn("regex_expression", F.concat(F.lit("\\b("), F.col("regex_expression"), F.lit(")\\b"))) \
.withColumn("text", F.when(F.size("text_entity") > 0, F.expr("regexp_replace(text, regex_expression, '(BLEEP)')")).otherwise(F.col(text)))
It removes duplicates, and only applies regexp_replace if there are atleast 1 entity. Probably not the most elegant solution, and will 'bleep' all occurrences of the word. Ideally the position should be used.
