Hello all,

I have the following Spark (pseudo)code:

rdd = mapPartitionsWithIndex(...)
        .mapPartitionsToPair(...)
        .groupByKey()
        .sortByKey(comparator)
        .partitionBy(myPartitioner)
        .mapPartitionsWithIndex(...)
        .mapPartitionsToPair( *f* )

The input data has 2 input splits (yarn 2.6.0).
myPartitioner partitions all the records on partition 0, which is correct,
so the intuition is that f provided to the last transformation
(mapPartitionsToPair) would run sequentially inside a single yarn
container. However from yarn logs I do see that both yarn containers are
processing records from the same partition ... and *sometimes*  the over
all job fails (due to the code in f which expects a certain order of
records) and yarn container 1 receives the records as expected, whereas
yarn container 2 receives a subset of records ... for a reason I cannot
explain and f fails.

The overall behavior of this job is that sometimes it succeeds and
sometimes it fails ... apparently due to inconsistent propagation of sorted
records to yarn containers.


If any of this makes any sense to you, please let me know what I am missing.



Best,
Marius

Reply via email to