I have a use case, where in if atleast one of the record has the value err_present in column err for a particular file, I would like to mark rest of the records for same file as bad_file value in dataframe.
Input Dataframe
----------- ---------
|err |file_name|
----------- ---------
|err_present|f1 |
| |f1 |
| |f1 |
| |f2 |
| |f2 |
----------- ---------
Above dataframe has err_present for f1 file_name column. So I want to mark the other rows containing f1 with bad_file in final dataframe.
Desired output DF:
-------- ---------
|err_present|file_name|
-------- --------- --
|err_present| f1|
|bad_file | f1|
|bad_file | f1|
| null | f2|
| null | f2|
-------- ---------
Example Dataframe
df = spark.createDataFrame([('err_present', 'f1'), ('', 'f1'), ('', 'f1'),
('', 'f2'), ('', 'f2')]
, ['err', 'file_name'])
CodePudding user response:
#Isolate rows with err_present, rename err column and lit bad_file
s = df.where(col('err')=='err_present').withColumn('newerr',lit('bad_file')).drop('err')
(df.join(broadcast(s),how='left', on='file_name')#broadcast join new df from above to avaoid shuffle
.withColumn('err', when(col('err')=='err_present',col('err')).otherwise(col('newerr')))#conditionally update err using new_err
.drop('newerr')#drop unwated column
).show()
--------- -----------
|file_name| err|
--------- -----------
| f1|err_present|
| f1| bad_file|
| f1| bad_file|
| f2| null|
| f2| null|
--------- -----------
CodePudding user response:
You can partition by the file_name and create a new column called err_present_in_group that is True if the string "err_present" is in that particular partition:
df.select("*",
F.max(
(F.col('err') == 'err_present')
).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).show()
----------- --------- --------------------
| err|file_name|err_present_in_group|
----------- --------- --------------------
|err_present| f1| true|
| | f1| true|
| | f1| true|
| | f2| false|
| | f2| false|
----------- --------- --------------------
Then our final err_present column can be determined by applying conditions based on the value in the err and err_present columns.
df.select("*",
F.max(
(F.col('err') == 'err_present')
).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).withColumn(
'err_present',
F.when(
(F.col('err_present_in_group')) & (F.col('err') == 'err_present'),
F.lit('err_present')
).when(
(F.col('err_present_in_group')) & (F.col('err') == ''),
F.lit('bad_file')
).otherwise(None)
).select(
'err_present','file_name'
)
----------- --------- --------------------
| err|file_name|err_present_in_group|
----------- --------- --------------------
|err_present| f1| true|
| | f1| true|
| | f1| true|
| | f2| false|
| | f2| false|
----------- --------- --------------------
