Home > Blockchain >  create median and average column out of array column in pyspark
create median and average column out of array column in pyspark

Time:02-03

I have the following spark dataframe:

import pandas as pd
foo = pd.DataFrame({'id': [1,2,3,4], 'col': [[1,2,None, None], [1,1,2,2], [1,1,1,2], [1,None,None,None]]})

foo_dfs = spark.createDataFrame(foo)
foo_dfs.show()
 --- ------------ 
| id|         col|
 --- ------------ 
|  1|    [1, 2,,]|
|  2|[1, 1, 2, 2]|
|  3|[1, 1, 1, 2]|
|  4|      [1,,,]|
 --- ------------ 

I am creating the min_col and max_col by doing the following:

from pyspark.sql import functions as f
foo_dfs = foo_dfs.withColumn('min_col', f.array_min('col'))
foo_dfs = foo_dfs.withColumn('max_col', f.array_max('col'))

which outputs:

 --- ------------ ------- ------- 
| id|         col|min_col|max_col|
 --- ------------ ------- ------- 
|  1|    [1, 2,,]|      1|      2|
|  2|[1, 1, 2, 2]|      1|      2|
|  3|[1, 1, 1, 2]|      1|      2|
|  4|      [1,,,]|      1|      1|
 --- ------------ ------- ------- 

I would like to also add the mean_col and median_col next to the max_col. How can I do that ? As far as I know there is no f.array_mean or f.array_median function

The output dataframe should look like:

 --- ------------ ------- ------- -------- ---------- 
| id|         col|min_col|max_col|mean_col|median_col|
 --- ------------ ------- ------- -------- ---------- 
|  1|    [1, 2,,]|      1|      2|     1.5|       1.5|
|  2|[1, 1, 2, 2]|      1|      2|     1.5|       1.5|
|  3|[1, 1, 1, 2]|      1|      2|    1.25|       1.0|
|  4|      [1,,,]|      1|      1|     1.0|       1.0|
 --- ------------ ------- ------- -------- ---------- 

So the mean_col and median_col calculation should ignore the None values

I tried this:

 import numpy as np
 array_median = f.udf(lambda x: float(np.nanmedian(x)), FloatType())
 array_mean = f.udf(lambda x: float(np.nanmean(x)), FloatType())
 foo_dfs.withColumn('median_col', array_median('col'))
 foo_dfs.withColumn('mean_col', array_mean('col'))

But it didn't work, I am getting the following error:

TypeError: ufunc 'isnan' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

Any ideas?

CodePudding user response:

Use aggregate function for the mean_col and when along with array_sort to get the median_col. But first, you need to filter null values from the array using filter function:

from pyspark.sql import functions as F

foo_dfs = (foo_dfs.withColumn('col', F.array_sort(F.expr('filter(col, x -> x is not null)')))
           .withColumn('size', F.size('col'))
           .withColumn('min_col', F.array_min('col'))
           .withColumn('max_col', F.array_max('col'))
           .withColumn('mean_col', F.expr('aggregate(col, 0L, (acc,x) -> acc x, acc -> acc /size)'))
           .withColumn('median_col', F.when(F.col('size') % 2 == 0,
                                            (F.expr('col[int(size/2)]')   F.expr('col[int(size/2)-1]'))/2
                                            ).otherwise(F.expr('col[int(size/2)]'))
                       )
           ).drop("size")

foo_dfs.show()
# --- ------------ ------- ------- -------- ---------- 
#| id|         col|min_col|max_col|mean_col|median_col|
# --- ------------ ------- ------- -------- ---------- 
#|  1|      [1, 2]|      1|      2|     1.5|       1.5|
#|  2|[1, 1, 2, 2]|      1|      2|     1.5|       1.5|
#|  3|[1, 1, 1, 2]|      1|      2|    1.25|       1.0|
#|  4|         [1]|      1|      1|     1.0|       1.0|
# --- ------------ ------- ------- -------- ---------- 

Some explanations:

  • mean_col: aggregate functions sums all the elements of the array then apply a finish lambda function which divides the resulting sum by the size of the array.
  • median_col: sort the array and check its size: if size%2 = 0 then addition the elements at indexes size/2 and size/2 -1 and divide by 2. Otherwise (size%2 != 0) the median correspond to element of index size/2.
  •  Tags:  
  • Related