Spark: How to combine 2 sorted RDDs so that order is preserved after union?

I have 2 sorted RDDs:

val rdd_a = some_pair_rdd.sortByKey().
                    zipWithIndex.filter(f => f._2 < n).
                    map(f => f._1)
val rdd_b = another_pair_rdd.sortByKey().
                    zipWithIndex.filter(f => f._2 < n).
                    map(f => f._1)
val all_rdd = rdd_a.union(rdd_b)

In all_rdd, I see that the order is not necessarily maintained as I'd imagined (that all elements of rdd_a come first, followed by all elements of rdd_b). Is my assumption incorrect (about the contract of union), and if so, what should I use to append multiple sorted RDDs into a single rdd?

Answers


I'm fairly new to Spark so I could be wrong, but from what I understand Union is a narrow transformation. That is, each executor joins only its local blocks of RDD a with its local blocks of RDD b and then returns that to the driver.

As an example, let's say that you have 2 executors and 2 RDDS.

RDD_A = ["a","b","c","d","e","f"]

and

RDD_B = ["1","2","3","4","5","6"]

Let Executor 1 contain the first half of both RDD's and Executor 2 contain the second half of both RDD's. When they perform the union on their local blocks, it would look something like:

Union_executor1 = ["a","b","c","1","2","3"]

and

Union_executor2 = ["d","e","f","4","5","6"]

So when the executors pass their parts back to the driver you would have ["a","b","c","1","2","3","d","e","f","4","5","6"]

Again, I'm new to Spark and I could be wrong. I'm just sharing based on my understanding of how it works with RDD's. Hopefully we can both learn something from this.


You can't. Spark does not have a merge sort, because you can't make assumptions about the way that the RDDs are actually stored on the nodes. If you want things in sort order after you take the union, you need to sort again.


Need Your Help