Home > Enterprise >  Select top N after aggregating by key and another field in pyspark RDD
Select top N after aggregating by key and another field in pyspark RDD

Time:01-15

assuming a data set like this:

(teamId, player, minutesPlayed)
(1, 'A', 33)
(1, 'A', 12)
(1, 'B', 5)
(2, 'C', 22)
(2, 'C', 15)
(2, 'C', 33)
(2, 'D', 0)
......

So every tuple represents how many minutes player played in a game per team. So I would like to aggregate data per player per team and select top 10 most playing players per team. Assuming a much larger data set of course. So let's assume team 1 has 15 players, we want to get top 10 by minutesPlayed

Resulting data set would be:

(1, 'A', 350)
(1, 'B', 330)
#... rest 8 players of team 1
(2, 'C', 500)
(2, 'D', 330)
(2, 'E', 250)
#... rest 7 players of team 2
#.... rest of team with 10 players with most minutes

def map_players(data):
    teamId = data[0],
    player = data[1],
    minutesPlayed = data[2]

    return ((teamId, player), minutesPlayed) # returning the combination of team and player (assuming this is the way to do it)

def reduce_players(p1, p2):
    # really not sure what to do here
    # p1 and p2 are just then minutes played (int)

result:

player_data_set.map(map_players).reduceByKey(reduce_players).collect() # take(10)?

(1, 'A', 350)
(1, 'B', 330)
#... rest 8 players of team 1
(2, 'C', 500)
(2, 'D', 330)
(2, 'E', 250)
#... rest 7 players of team 2
#.... rest of team with 10 players with most minutes

I would like to do everything within reduceByKey reducer and use only .map method for mapping.

CodePudding user response:

If you want to use RDD then you can do something like this:

  1. reduce using teamId player as key to calculate the total minutes played by each player
  2. reduce using this time only teamId as key to get the list of players with their count of minutes played for each team
  3. flatmap values to sort the list of (player, count) on descending order and take first 10 values

Example:

from operator import add

rdd = spark.sparkContext.parallelize([
    (1, 'A', 33), (1, 'A', 12), (1, 'B', 5), (2, 'C', 22),
    (2, 'C', 15), (2, 'C', 33), (2, 'D', 0)
])

rdd1 = rdd.map(lambda x: ((x[0], x[1]), x[2])) \
    .reduceByKey(add) \
    .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
    .reduceByKey(add)\
    .flatMapValues(lambda x: sorted(x, key=lambda a: a[1], reverse=True)[:10])

for p in rdd1.collect():
    print(p)
#(1, ('A', 45))
#(1, ('B', 5))
#(2, ('C', 70))
#(2, ('D', 0))

CodePudding user response:

If you convert your data into a pyspark Dataframe you could do it like this:

from pyspark.sql import functions as F, Window
(
    df
    .groupBy('teamId', 'player')
    .agg(F.sum('minutesPlayed').alias('minutesPlayedTotal'))
    .withColumn('rank', F.row_number().over(Window.partitionBy('teamId').orderBy(F.desc('minutesPlayedTotal')))
    .where('rank <= 10')
    .show()
)
  •  Tags:  
  • Related