assuming a data set like this:
(teamId, player, minutesPlayed)
(1, 'A', 33)
(1, 'A', 12)
(1, 'B', 5)
(2, 'C', 22)
(2, 'C', 15)
(2, 'C', 33)
(2, 'D', 0)
......
So every tuple represents how many minutes player played in a game per team.
So I would like to aggregate data per player per team and select top 10 most playing players per team. Assuming a much larger data set of course.
So let's assume team 1 has 15 players, we want to get top 10 by minutesPlayed
Resulting data set would be:
(1, 'A', 350)
(1, 'B', 330)
#... rest 8 players of team 1
(2, 'C', 500)
(2, 'D', 330)
(2, 'E', 250)
#... rest 7 players of team 2
#.... rest of team with 10 players with most minutes
def map_players(data):
teamId = data[0],
player = data[1],
minutesPlayed = data[2]
return ((teamId, player), minutesPlayed) # returning the combination of team and player (assuming this is the way to do it)
def reduce_players(p1, p2):
# really not sure what to do here
# p1 and p2 are just then minutes played (int)
result:
player_data_set.map(map_players).reduceByKey(reduce_players).collect() # take(10)?
(1, 'A', 350)
(1, 'B', 330)
#... rest 8 players of team 1
(2, 'C', 500)
(2, 'D', 330)
(2, 'E', 250)
#... rest 7 players of team 2
#.... rest of team with 10 players with most minutes
I would like to do everything within reduceByKey reducer and use only .map method for mapping.
CodePudding user response:
If you want to use RDD then you can do something like this:
- reduce using
teamIdplayeras key to calculate the total minutes played by eachplayer - reduce using this time only
teamIdas key to get the list of players with their count of minutes played for each team - flatmap values to sort the list of
(player, count)on descending order and take first 10 values
Example:
from operator import add
rdd = spark.sparkContext.parallelize([
(1, 'A', 33), (1, 'A', 12), (1, 'B', 5), (2, 'C', 22),
(2, 'C', 15), (2, 'C', 33), (2, 'D', 0)
])
rdd1 = rdd.map(lambda x: ((x[0], x[1]), x[2])) \
.reduceByKey(add) \
.map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
.reduceByKey(add)\
.flatMapValues(lambda x: sorted(x, key=lambda a: a[1], reverse=True)[:10])
for p in rdd1.collect():
print(p)
#(1, ('A', 45))
#(1, ('B', 5))
#(2, ('C', 70))
#(2, ('D', 0))
CodePudding user response:
If you convert your data into a pyspark Dataframe you could do it like this:
from pyspark.sql import functions as F, Window
(
df
.groupBy('teamId', 'player')
.agg(F.sum('minutesPlayed').alias('minutesPlayedTotal'))
.withColumn('rank', F.row_number().over(Window.partitionBy('teamId').orderBy(F.desc('minutesPlayedTotal')))
.where('rank <= 10')
.show()
)
