Home > Mobile >  pyspark dataframe not maintaining order after dropping a column
pyspark dataframe not maintaining order after dropping a column

Time:02-04

I create a dataframe:

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

its contents are :

df.show()
 --- --- 
|  a|  c|
 --- --- 
|  0|  0|
|  1|  1|
|  3|  3|
|  5|  5|
|  6|  6|
|  8|  8|
|  9|  9|
| 10| 10|
|  2|  2|
|  4|  4|
|  7|  7|
| 11| 11|
 --- --- 

But, If I drop a column, the remaining column gets permuted

df.drop('c').show()
 --- 
|  a|
 --- 
|  0|
|  2|
|  3|
|  5|
|  6|
|  7|
|  9|
| 11|
|  1|
|  4|
|  8|
| 10|
 --- 

Please help me understand what is happening here?

CodePudding user response:

I wanted to add my answer since I felt I could explain this slightly differently.

The repartition results in a RoundRobinPartition. It basically redistributes the data in round-robin fashion.

Since you are evaluating the dataframe again, it recomputes the partition after the drop.

You can see this by running a few commands in addition to what you have shown.

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

df.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
#  - Scan ExistingRDD[a#14L,c#15L]

print("Partitions structure: {}".format(df.rdd.glom().collect()))
# Partitions structure: [[], [], [], [], [], [], [Row(a=0, c=0), Row(a=1, c=1), Row(a=3, c=3), Row(a=5, c=5), Row(a=6, c=6), Row(a=8, c=8), Row(a=9, c=9), Row(a=10, c=10)], [Row(a=2, c=2), Row(a=4, c=4), Row(a=7, c=7), Row(a=11, c=11)]]

temp = df.drop("c")

temp.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
#  - *(1) Project [a#14L]
#    - Scan ExistingRDD[a#14L,c#15L]


print("Partitions structure: {}".format(temp.rdd.glom().collect()))
# Partitions structure: [[], [], [], [], [], [], [Row(a=0), Row(a=2), Row(a=3), Row(a=5), Row(a=6), Row(a=7), Row(a=9), Row(a=11)], [Row(a=1), Row(a=4), Row(a=8), Row(a=10)]]

In the above code, the explain() shows the RoundRobinPartitioning taking place. The use of glom shows the redistribution of data across partitions.

In the original dataframe, the partitions are in the order that you see the results of show().

In the second dataframe above, you can see that the data has shuffled across the last two partitions, resulting in it not being in the same order. This is because when re-evaluating the dataframe the repartition runs again.

CodePudding user response:

What Spark does is splitting the dataframe among the nodes, running the calculations on the nodes, than merging again the result. The way data is divided into parts depends on many factors, like the load on the cluster and on single nodes.

For this reason, by default, no ordering is guranteed. If you want a specific order, you have to use the sort() function, like this:

df.sort('a').show()

Will print

 --- --- 
|  a|  c|
 --- --- 
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
| 10| 10|
| 11| 11|
 --- --- 

Also, after removing a column, you will have to sort again:

df.drop('c').sort('a').show()

Will print:

 --- 
|  a|
 --- 
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
 --- 

CodePudding user response:

Repartition shuffles your data across nodes. What you are experiencing is expected. If you need to maintain order, you must sort by relevant column when calling the df

Lets try

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

and

df.select(*).show()

outcome

 --- --- 
|  a|  c|
 --- --- 
|  0|  0|
|  1|  1|
|  3|  3|
|  5|  5|
|  6|  6|
|  8|  8|
|  9|  9|
| 10| 10|
|  2|  2|
|  4|  4|
|  7|  7|
| 11| 11|
 --- --- 

You realise the data is shuffled and resembles your earlier output.

You can control this using coalesce. coalesce does do a shuffle, instead new partitions claim current partitions

  df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).coalesce(8)

You notice df.drop('c').show() maintains the order

Following Your Comments

Repartion is never written to disk unlike partionby. It is held in memory. Repartion shuffles. Under the hood, the last mapper in DAG is used at the reshuffle stage.

the mapper relies on the shuffleExchangeExec which at the very start defines the partitioner that will be used to distribute the data in the partitions. In the case of repartition, naturally, round robin is used. Round robin is a random distributor. being a random distributor, (re)sorting of the data happens.

So what's happening in this case? When you call df.drop(), reparation is redone. Roundrobin partition is again reapplied causing distortion.

  •  Tags:  
  • Related