Home > Software engineering >  Trouble with Groupby loop in Python/PySpark
Trouble with Groupby loop in Python/PySpark

Time:02-05

I'm converting some SAS code to PySpark and learning as I go. I'm trying to convert the below code, such that it creates two tables (NEED_VAR1 and NEED_VAR2).

%MACRO split(VAR=);
PROC SUMMARY DATA=HAVE NWAY MISSING;
CLASS  &VAR. &VAR._DESC;
VAR col1 col2 col3;
OUTPUT OUT=NEED_&VAR.(DROP=_FREQ_ _TYPE_) SUM=;
RUN;
%MEND;
%SPLIT(VAR=VAR1);
%SPLIT(VAR=VAR2);

What I have so far...which is not doing the trick. I want to feed in 'VAR1' to each of the '{}' producing NEED_VAR1...and then the same for VAR2. The two groupby variable pairs should be "VAR1/VAR2 and VAR1_DESC/VAR2_DESC. Any advice?

import pyspark.sql.functions as F

VAR = [['VAR1','VAR2'],['VAR1','VAR2'],['VAR1','VAR2']]

for start,continue,end in VAR:
  NEED_'{}' = HAVE.groupBy('{}',"'{}'_DESC") \
                  .agg(F.sum('col1').alias('col1'), \
                       F.sum('col2').alias('col2'), \
                       F.sum('col3').alias('col3'), \
                  .format(start,continue,end)
NEED_VAR1.show()
NEED_VAR2.show()

*** Update-Some sample code:

import pandas as pd
 
df = {'VAR1':['14200', '38110', '02120', '15831'],
        'VAR1_DESC':['Drug1', 'Drug2', 'Drug3', 'Drug4'],
        'VAR2':['200', '110', '120', '831'],
        'VAR2_DESC':['Drug1_2', 'Drug2_2', 'Drug3_2', 'Drug4_2'],
        'col1':[297.62, 340.67, 12.45, 1209.87],
        'col2':[200.32, 210.37, 19.39, 1800.85],
        'col3':[1294.65, 322.90, 193.45, 14.59]
        }
 
HAVE = pd.DataFrame(df)
print(HAVE)

CodePudding user response:

Creating variables with a string concatenation is not syntactically correct , nor allowed

If you only have 2 groups to tackle , you can directly create the individual Dataframes without using a loop. If there exists multiple other combinations , do update it in the comments , I ll update the answer accordingly

NEED_VAR1 = HAVE.groupBy(["VAR1","VAR2"]) \
                        .agg(F.sum('col1').alias('col1'), \
                            F.sum('col2').alias('col2'), \
                            F.sum('col3').alias('col3'), \
                       )

NEED_VAR2 = HAVE.groupBy(["VAR1_DESC","VAR2_DESC"]) \
                        .agg(F.sum('col1').alias('col1'), \
                            F.sum('col2').alias('col2'), \
                            F.sum('col3').alias('col3'), \
                       )

Dynamic Approach

The approach you can take is to create a split_map which will contain , all the required groups/level of aggregations

Furthermore , with each group , the resultant needs to be stored within a container [list] , which can be used individually or can be merged together to form a single DataFrame via union

NOTE - Union has some caveats to take care of , which I have taken care in the below code snippet

Data Preparation

 ----- --------- ---- --------- ------- ------- ------- 
| VAR1|VAR1_DESC|VAR2|VAR2_DESC|   col1|   col2|   col3|
 ----- --------- ---- --------- ------- ------- ------- 
|14200|    Drug1| 200|  Drug1_2| 297.62| 200.32|1294.65|
|38110|    Drug2| 110|  Drug2_2| 340.67| 210.37|  322.9|
|02120|    Drug3| 120|  Drug3_2|  12.45|  19.39| 193.45|
|15831|    Drug4| 831|  Drug4_2|1209.87|1800.85|  14.59|
 ----- --------- ---- --------- ------- ------- ------- 

Split Map & Data Union

def concat_spark_dataframe(in_lst):

    return reduce(lambda x,y:x.unionAll(y),res)

split_map = {
    'group_1' :['VAR1','VAR2']
    ,'group_2' :['VAR1_DESC','VAR2_DESC']
}

res = []

agg_cols = ['col1','col2','col3']

agg_func = [ F.sum(F.col(c)).alias(c) for c in agg_cols ]

for key in split_map:
    
    imm_column_map = ['Group Key - 1','Group Key - 2']
    
    group_col_lst = split_map[key]
    
    final_col_lst = group_col_lst   agg_cols
    
    immDF = sparkDF.select(*[final_col_lst])
    
    immDF = reduce(lambda x, idx: x.withColumnRenamed(   group_col_lst[idx]
                                                        ,imm_column_map[idx]
                                                     )
                   , range( len(imm_column_map) )
                   , immDF
                )
    
    res  = [ 
                immDF.groupBy(imm_column_map)\
                       .agg(*agg_func)\
                       .withColumn('group-key',F.lit(key))
                       .withColumn('group-value',F.lit('-'.join(group_col_lst)))
           ]


finalDF = concat_spark_dataframe(res)

finalDF.show()

 ------------- ------------- ------- ------- ------- --------- ------------------- 
|Group Key - 1|Group Key - 2|   col1|   col2|   col3|group-key|        group-value|
 ------------- ------------- ------- ------- ------- --------- ------------------- 
|        14200|          200| 297.62| 200.32|1294.65|  group_1|          VAR1-VAR2|
|        15831|          831|1209.87|1800.85|  14.59|  group_1|          VAR1-VAR2|
|        02120|          120|  12.45|  19.39| 193.45|  group_1|          VAR1-VAR2|
|        38110|          110| 340.67| 210.37|  322.9|  group_1|          VAR1-VAR2|
|        Drug2|      Drug2_2| 340.67| 210.37|  322.9|  group_2|VAR1_DESC-VAR2_DESC|
|        Drug1|      Drug1_2| 297.62| 200.32|1294.65|  group_2|VAR1_DESC-VAR2_DESC|
|        Drug3|      Drug3_2|  12.45|  19.39| 193.45|  group_2|VAR1_DESC-VAR2_DESC|
|        Drug4|      Drug4_2|1209.87|1800.85|  14.59|  group_2|VAR1_DESC-VAR2_DESC|
 ------------- ------------- ------- ------- ------- --------- ------------------- 

Individual Splits

NEW_VAR1 = finalDF.filter(F.col('group-key') == 'group_1')
NEW_VAR2 = finalDF.filter(F.col('group-key') == 'group_2')

The solution is dynamic in nature , can accomodate N number of groups & splits

  •  Tags:  
  • Related