How to calculate the counts of each distinct value in column for all the columns in a pyspark dataframe? This is my input dataframe:
spark.table("table1").show()
------- --------- ------- --------
|col1 | col2 | col3 | col4 |
------- --------- ------- --------
|aa | ss | sss | jjj |
|bb | 123 | 1203 | uuu |
|null | 123 | null | zzz |
|null | 123 | 1203 | 6543 |
------- --------- -------- --------
I need the final output data frame some thing like this:
----------- ------------- ------- ------- ------------
|table_name | Column_name | Value | count | percentage |
----------- ------------- ------- ------- ------------
|table1 | col1 | aa | 1 | |
|table1 | col1 | bb | 1 | |
|table1 | col1 | null | 2 | |
|table1 | col2 | ss | 1 | |
|table1 | col2 | 123 | 3 | |
|table1 | col3 | sss | 1 | |
|table1 | col3 | 1203 | 2 | |
|table1 | col3 | null | 1 | |
|table1 | col4 | jjj | 1 | |
|table1 | col4 | uuu | 1 | |
|table1 | col4 | zzz | 1 | |
|table1 | col4 | 6543 | 1 | |
----------- ------------- ------- ------- ------------
I have Python logic for calculating the percentage. Same needs to be implemented in Pyspark
percentages.append(excel['enum'][col].value_counts()[value]/excel['enum'][col].shape[0] * 100)
CodePudding user response:
You can group by each column to count different values then union all the intermediary dataframes. Or use counting over window partitioned by each column then unpivot to get the desired output. Here's an example:
from pyspark.sql import functions as F, Window
df = spark.createDataFrame([
("aa", "ss", "sss", "jjj"), ("bb", "123", "1203", "uuu"),
(None, "123", None, "zzz"), (None, "123", "1203", "6543")
], ["col1", "col2", "col3", "col4"])
result = df.select(*[
F.struct(
F.col(c).alias("Value"),
F.count(F.coalesce(F.col(c), F.lit("null"))).over(Window.partitionBy(c)).alias("count")
).alias(c) for c in df.columns
]).agg(*[
F.collect_set(c).alias(c) for c in df.columns
]).selectExpr(
f"stack({len(df.columns)}," ','.join(chain(*[(c, f"'{c}'") for c in df.columns])) ")"
).selectExpr(
"col1 as Column_name", "inline(col0)"
).withColumn(
"percentage",
F.round(F.col("count") / df.count() * 100, 2)
)
result.show()
# ----------- ----- ----- ----------
# |Column_name|Value|count|percentage|
# ----------- ----- ----- ----------
# |col1 |null |2 |50.0 |
# |col1 |bb |1 |25.0 |
# |col1 |aa |1 |25.0 |
# |col2 |123 |3 |75.0 |
# |col2 |ss |1 |25.0 |
# |col3 |null |1 |25.0 |
# |col3 |1203 |2 |50.0 |
# |col3 |sss |1 |25.0 |
# |col4 |jjj |1 |25.0 |
# |col4 |uuu |1 |25.0 |
# |col4 |zzz |1 |25.0 |
# |col4 |6543 |1 |25.0 |
# ----------- ----- ----- ----------
