I'm converting some SAS code to PySpark and learning as I go. I'm trying to convert the below code, such that it creates two tables (NEED_VAR1 and NEED_VAR2).
%MACRO split(VAR=);
PROC SUMMARY DATA=HAVE NWAY MISSING;
CLASS &VAR. &VAR._DESC;
VAR col1 col2 col3;
OUTPUT OUT=NEED_&VAR.(DROP=_FREQ_ _TYPE_) SUM=;
RUN;
%MEND;
%SPLIT(VAR=VAR1);
%SPLIT(VAR=VAR2);
What I have so far...which is not doing the trick. I want to feed in 'VAR1' to each of the '{}' producing NEED_VAR1...and then the same for VAR2. The two groupby variable pairs should be "VAR1/VAR2 and VAR1_DESC/VAR2_DESC. Any advice?
import pyspark.sql.functions as F
VAR = [['VAR1','VAR2'],['VAR1','VAR2'],['VAR1','VAR2']]
for start,continue,end in VAR:
NEED_'{}' = HAVE.groupBy('{}',"'{}'_DESC") \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
.format(start,continue,end)
NEED_VAR1.show()
NEED_VAR2.show()
*** Update-Some sample code:
import pandas as pd
df = {'VAR1':['14200', '38110', '02120', '15831'],
'VAR1_DESC':['Drug1', 'Drug2', 'Drug3', 'Drug4'],
'VAR2':['200', '110', '120', '831'],
'VAR2_DESC':['Drug1_2', 'Drug2_2', 'Drug3_2', 'Drug4_2'],
'col1':[297.62, 340.67, 12.45, 1209.87],
'col2':[200.32, 210.37, 19.39, 1800.85],
'col3':[1294.65, 322.90, 193.45, 14.59]
}
HAVE = pd.DataFrame(df)
print(HAVE)
CodePudding user response:
Creating variables with a string concatenation is not syntactically correct , nor allowed
If you only have 2 groups to tackle , you can directly create the individual Dataframes without using a loop. If there exists multiple other combinations , do update it in the comments , I ll update the answer accordingly
NEED_VAR1 = HAVE.groupBy(["VAR1","VAR2"]) \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
)
NEED_VAR2 = HAVE.groupBy(["VAR1_DESC","VAR2_DESC"]) \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
)
Dynamic Approach
The approach you can take is to create a split_map which will contain , all the required groups/level of aggregations
Furthermore , with each group , the resultant needs to be stored within a container [list] , which can be used individually or can be merged together to form a single DataFrame via union
NOTE - Union has some caveats to take care of , which I have taken care in the below code snippet
Data Preparation
----- --------- ---- --------- ------- ------- -------
| VAR1|VAR1_DESC|VAR2|VAR2_DESC| col1| col2| col3|
----- --------- ---- --------- ------- ------- -------
|14200| Drug1| 200| Drug1_2| 297.62| 200.32|1294.65|
|38110| Drug2| 110| Drug2_2| 340.67| 210.37| 322.9|
|02120| Drug3| 120| Drug3_2| 12.45| 19.39| 193.45|
|15831| Drug4| 831| Drug4_2|1209.87|1800.85| 14.59|
----- --------- ---- --------- ------- ------- -------
Split Map & Data Union
def concat_spark_dataframe(in_lst):
return reduce(lambda x,y:x.unionAll(y),res)
split_map = {
'group_1' :['VAR1','VAR2']
,'group_2' :['VAR1_DESC','VAR2_DESC']
}
res = []
agg_cols = ['col1','col2','col3']
agg_func = [ F.sum(F.col(c)).alias(c) for c in agg_cols ]
for key in split_map:
imm_column_map = ['Group Key - 1','Group Key - 2']
group_col_lst = split_map[key]
final_col_lst = group_col_lst agg_cols
immDF = sparkDF.select(*[final_col_lst])
immDF = reduce(lambda x, idx: x.withColumnRenamed( group_col_lst[idx]
,imm_column_map[idx]
)
, range( len(imm_column_map) )
, immDF
)
res = [
immDF.groupBy(imm_column_map)\
.agg(*agg_func)\
.withColumn('group-key',F.lit(key))
.withColumn('group-value',F.lit('-'.join(group_col_lst)))
]
finalDF = concat_spark_dataframe(res)
finalDF.show()
------------- ------------- ------- ------- ------- --------- -------------------
|Group Key - 1|Group Key - 2| col1| col2| col3|group-key| group-value|
------------- ------------- ------- ------- ------- --------- -------------------
| 14200| 200| 297.62| 200.32|1294.65| group_1| VAR1-VAR2|
| 15831| 831|1209.87|1800.85| 14.59| group_1| VAR1-VAR2|
| 02120| 120| 12.45| 19.39| 193.45| group_1| VAR1-VAR2|
| 38110| 110| 340.67| 210.37| 322.9| group_1| VAR1-VAR2|
| Drug2| Drug2_2| 340.67| 210.37| 322.9| group_2|VAR1_DESC-VAR2_DESC|
| Drug1| Drug1_2| 297.62| 200.32|1294.65| group_2|VAR1_DESC-VAR2_DESC|
| Drug3| Drug3_2| 12.45| 19.39| 193.45| group_2|VAR1_DESC-VAR2_DESC|
| Drug4| Drug4_2|1209.87|1800.85| 14.59| group_2|VAR1_DESC-VAR2_DESC|
------------- ------------- ------- ------- ------- --------- -------------------
Individual Splits
NEW_VAR1 = finalDF.filter(F.col('group-key') == 'group_1')
NEW_VAR2 = finalDF.filter(F.col('group-key') == 'group_2')
The solution is dynamic in nature , can accomodate N number of groups & splits
