I'm going to do a query with pyspark to filter row who contains at least one word in array. For example, the dataframe is:
"content" "other"
My father is big. ...
My mother is beautiful. ...
I'm going to travel. ...
I have an array:
array=["mother","father"]
And the output must be this:
"content" "other"
My father is big. ...
My mother is beautiful. ...
A simple filter for word in array.
CodePudding user response:
I think this solution works. Let me know what you think.
import pyspark.sql.functions as f
phrases = ['bc', 'ij']
df = spark.createDataFrame([
('abcd',),
('efgh',),
('ijkl',)
], ['col1'])
(df
.withColumn('phrases', f.array([f.lit(element) for element in phrases]))
.where(f.expr('exists(phrases, element -> col1 like concat("%", element, "%"))'))
.drop('phrases')
.show()
)
output
----
|col1|
----
|abcd|
|ijkl|
----
CodePudding user response:
Had the same thoughts as @ARCrow but using instr.
lst=["mother","father"]
DataFrame
data= [
(1,"My father is big."),
(2, "My mother is beautiful"),
(3,"I'm going to travel.")
]
df=spark.createDataFrame(data, ("id",'content'))
Solution
df=(df
.withColumn('phrases', f.array([f.lit(element) for element in lst]))
.where(f.expr('exists(phrases, element -> instr (content, element)>=1)'))
.drop('phrases')
)
df.show()
Outcome
--- --------------------
| id| content|
--- --------------------
| 1| My father is big.|
| 2|My mother is beau...|
--- --------------------
CodePudding user response:
Taking some the same configuration as @wwnde,
data= [
(1,"My father is big."),
(2, "My mother is beautiful"),
(3,"I'm going to travel.")
]
df=spark.createDataFrame(data, ("id",'content'))
Solution
words = ["father", "mother"]
conditions = " or ".join([f"content like '%{word}%'" for word in words])
(
df
.filter(F.expr(conditions))
.show(truncate=False)
)
--- ----------------------
|id |content |
--- ----------------------
|1 |My father is big. |
|2 |My mother is beautiful|
--- ----------------------
CodePudding user response:
We made the Fugue project to port native Python or Pandas code to Spark or Dask. This lets you can keep the logic very readable by expressing it in native Python. Fugue can then port it to Spark for you with one function call.
First, we setup,
import pandas as pd
array=["mother","father"]
df = pd.DataFrame({"sentence": ["My father is big.", "My mother is beautiful.", "I'm going to travel. "]})
and then we can create a native Python function to express the logic:
from typing import List, Dict, Any, Iterable
def myfilter(df: List[Dict[str,Any]]) -> Iterable[Dict[str, Any]]:
for row in df:
for value in array:
if value in row["sentence"]:
yield row
and then test it on Pandas:
from fugue import transform
transform(df, myfilter, schema="*")
Because of works on Pandas, we can execute it on Spark by specifying the engine:
import fugue_spark
transform(df, myfilter, schema="*", engine="spark").show()
--- --------------------
| id| sentence|
--- --------------------
| 0| My father is big.|
| 1|My mother is beau...|
--- --------------------
Note we need .show() because Spark evaluates lazily. Schema is also a Spark requirement so Fugue interprets the "*" as all columns in = all columns out.
The fugue transform function can take both Pandas DataFrame inputs and Spark DataFrame inputs.
Edit:
You can replace the myfilter function above with a Pandas implementation like this:
def myfilter(df: pd.DataFrame) -> pd.DataFrame:
res = df.loc[df["sentence"].str.contains("|".join(array))]
return res
and Fugue will be able to port it to Spark the same way. Fugue knows how to adjust to the type hints and this will be faster than the native Python implementation because it takes advantage of Pandas being vectorized.
