Hrm-- so you're saying records for the same GroupByKey are ending up in different partitions when you're doing a secondary sort? Sounds like a bug in the SparkPartitioner we're using-- I wonder if it was the same bug that was fixed here?
https://issues.apache.org/jira/browse/CRUNCH-556 On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <[email protected]> wrote: > Hello crunch! > > I am running into problems with partitioning of groups with secondary sort > running on SparkPipeline. > > What I am observing is that records belonging to a single group may be > split across two or more calls to apply DoFn. This could be a gap in my > understanding of Spark execution model wrt to locality - and if so, can > *all* the records belonging to a groupBy key be forced to a single call? > > Roughly speaking the code looks like this: > > PTableType<GroupByKey, Pair<SortKey, Info>> pType = > tableOf(Writables.writables(GroupByKey.class), > Writables.pairs(Writables.writables(SortKey.class), > Writables.writables(Info.class))); > > // note that dataset has been explicitly sharded by numPartitions > PTable< GroupByKey, Pair< SortKey, Info >> infos = dataset.parallelDo(..., > pType); > > PTable< SortKey, Info > mergedInfos = > SecondarySort.sortAndApply(infos, mergeInfos(...), > mergeType, numPartitions); > > static class GroupByKey implements Writable { > > public int treeId; > public int nodeId; > ... > } > > I can confirm that records come in sorted and grouped but I am also > observing that a single group may be executed on at different nodes. More > concretely lets say group belonging to treeId=0, nodeId=0 has 100 records, > the first 30 may show up on node1, and the remaining on node2 (in both > cases sorted). Informally it does look like it basically ensures that each > node is scheduled to process the same number of records. It's especially > evident with 2 partition where exactly one group is split. > > The semantics of the code (at least for now) require all the values to > come in with a single group. Can that be forced? > > env: spark 1.5 and crunch 0.11.0 > > Any thoughts would be appreciated! >
