Yeah, so that appears to be the symptom. I did try both 0.13 and just built 0.14 from source - but the problem is present in both cases.
When I use 2 partitions - and say 100 total groups, I see the first partition starting at group 0 and going through groups to group 47, sort field index 344 roughly the log line groupId:47, sortFieldIndex: 344 the next partition executing on different node starts at: groupId: 47, sortFieldIndex: 345 So group is broken up I do see that PartitionedMapOutputFunction is used in PGroupedTableImpl (spark) so it looks like CRUNCH-556 <https://issues.apache.org/jira/browse/CRUNCH-556> isn't sufficient. On Wed, Dec 9, 2015 at 8:56 PM, Josh Wills <[email protected]> wrote: > Hrm-- so you're saying records for the same GroupByKey are ending up in > different partitions when you're doing a secondary sort? Sounds like a bug > in the SparkPartitioner we're using-- I wonder if it was the same bug that > was fixed here? > > https://issues.apache.org/jira/browse/CRUNCH-556 > > On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <[email protected]> > wrote: > >> Hello crunch! >> >> I am running into problems with partitioning of groups with secondary >> sort running on SparkPipeline. >> >> What I am observing is that records belonging to a single group may be >> split across two or more calls to apply DoFn. This could be a gap in my >> understanding of Spark execution model wrt to locality - and if so, can >> *all* the records belonging to a groupBy key be forced to a single call? >> >> Roughly speaking the code looks like this: >> >> PTableType<GroupByKey, Pair<SortKey, Info>> pType = >> tableOf(Writables.writables(GroupByKey.class), >> Writables.pairs(Writables.writables(SortKey.class), >> Writables.writables(Info.class))); >> >> // note that dataset has been explicitly sharded by numPartitions >> PTable< GroupByKey, Pair< SortKey, Info >> infos = >> dataset.parallelDo(..., pType); >> >> PTable< SortKey, Info > mergedInfos = >> SecondarySort.sortAndApply(infos, mergeInfos(...), >> mergeType, numPartitions); >> >> static class GroupByKey implements Writable { >> >> public int treeId; >> public int nodeId; >> ... >> } >> >> I can confirm that records come in sorted and grouped but I am also >> observing that a single group may be executed on at different nodes. More >> concretely lets say group belonging to treeId=0, nodeId=0 has 100 records, >> the first 30 may show up on node1, and the remaining on node2 (in both >> cases sorted). Informally it does look like it basically ensures that each >> node is scheduled to process the same number of records. It's especially >> evident with 2 partition where exactly one group is split. >> >> The semantics of the code (at least for now) require all the values to >> come in with a single group. Can that be forced? >> >> env: spark 1.5 and crunch 0.11.0 >> >> Any thoughts would be appreciated! >> > >
