I have the following spark dataframe:
import pandas as pd
foo = pd.DataFrame({'id': [1,2,3,4], 'col': [[1,2,None, None], [1,1,2,2], [1,1,1,2], [1,None,None,None]]})
foo_dfs = spark.createDataFrame(foo)
foo_dfs.show()
--- ------------
| id| col|
--- ------------
| 1| [1, 2,,]|
| 2|[1, 1, 2, 2]|
| 3|[1, 1, 1, 2]|
| 4| [1,,,]|
--- ------------
I am creating the min_col and max_col by doing the following:
from pyspark.sql import functions as f
foo_dfs = foo_dfs.withColumn('min_col', f.array_min('col'))
foo_dfs = foo_dfs.withColumn('max_col', f.array_max('col'))
which outputs:
--- ------------ ------- -------
| id| col|min_col|max_col|
--- ------------ ------- -------
| 1| [1, 2,,]| 1| 2|
| 2|[1, 1, 2, 2]| 1| 2|
| 3|[1, 1, 1, 2]| 1| 2|
| 4| [1,,,]| 1| 1|
--- ------------ ------- -------
I would like to also add the mean_col and median_col next to the max_col. How can I do that ? As far as I know there is no f.array_mean or f.array_median function
The output dataframe should look like:
--- ------------ ------- ------- -------- ----------
| id| col|min_col|max_col|mean_col|median_col|
--- ------------ ------- ------- -------- ----------
| 1| [1, 2,,]| 1| 2| 1.5| 1.5|
| 2|[1, 1, 2, 2]| 1| 2| 1.5| 1.5|
| 3|[1, 1, 1, 2]| 1| 2| 1.25| 1.0|
| 4| [1,,,]| 1| 1| 1.0| 1.0|
--- ------------ ------- ------- -------- ----------
So the mean_col and median_col calculation should ignore the None values
I tried this:
import numpy as np
array_median = f.udf(lambda x: float(np.nanmedian(x)), FloatType())
array_mean = f.udf(lambda x: float(np.nanmean(x)), FloatType())
foo_dfs.withColumn('median_col', array_median('col'))
foo_dfs.withColumn('mean_col', array_mean('col'))
But it didn't work, I am getting the following error:
TypeError: ufunc 'isnan' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''
Any ideas?
CodePudding user response:
Use aggregate function for the mean_col and when along with array_sort to get the median_col. But first, you need to filter null values from the array using filter function:
from pyspark.sql import functions as F
foo_dfs = (foo_dfs.withColumn('col', F.array_sort(F.expr('filter(col, x -> x is not null)')))
.withColumn('size', F.size('col'))
.withColumn('min_col', F.array_min('col'))
.withColumn('max_col', F.array_max('col'))
.withColumn('mean_col', F.expr('aggregate(col, 0L, (acc,x) -> acc x, acc -> acc /size)'))
.withColumn('median_col', F.when(F.col('size') % 2 == 0,
(F.expr('col[int(size/2)]') F.expr('col[int(size/2)-1]'))/2
).otherwise(F.expr('col[int(size/2)]'))
)
).drop("size")
foo_dfs.show()
# --- ------------ ------- ------- -------- ----------
#| id| col|min_col|max_col|mean_col|median_col|
# --- ------------ ------- ------- -------- ----------
#| 1| [1, 2]| 1| 2| 1.5| 1.5|
#| 2|[1, 1, 2, 2]| 1| 2| 1.5| 1.5|
#| 3|[1, 1, 1, 2]| 1| 2| 1.25| 1.0|
#| 4| [1]| 1| 1| 1.0| 1.0|
# --- ------------ ------- ------- -------- ----------
Some explanations:
mean_col: aggregate functions sums all the elements of the array then apply a finish lambda function which divides the resulting sum by the size of the array.median_col: sort the array and check its size: ifsize%2 = 0then addition the elements at indexessize/2andsize/2 -1and divide by 2. Otherwise (size%2 != 0) the median correspond to element of indexsize/2.
