Hi Nathan, Your code looks fine to me. The single-use iterator stuff introduced by CRUNCH-192 was intended to prevent iterating over a group's values multiple times within a reduce operation (when running in MapReduce mode).
Looks like the MemPipeline is doing something wrong when multiple PCollections are created from a single PCollection. Would you mind logging this in jira? Thanks, Gabriel On Sat, Apr 30, 2016 at 2:15 PM, Nathan Schile <[email protected]> wrote: > I'm running into a "java.lang.IllegalStateException: iterator() can only be > called once on this Iterable" [1] when running a unit test that > utilizes a MemCollection. The pipeline appears to runs fine when running on > a cluster. I have a PGroupedTable that I am running multiple operations > (mapValues, ungroup) [2] on that is causing the exception. The mapValues and > the ungroup operations are run in separate jobs on the cluster, so I don't > believe I should be running into iterator issues. This constraint was > introduced by CRUNCH-192 [3]. Is it fine to perform multiple operations like > I'm doing, or am I making incorrect assumptions? > > [1] > apache-crunch-0.8.4 branch > > iterator() can only be called once on this Iterable > java.lang.IllegalStateException: iterator() can only be called once on this > Iterable > at > org.apache.crunch.impl.SingleUseIterable.iterator(SingleUseIterable.java:43) > at > org.apache.crunch.impl.mem.collect.MemGroupedTable$UngroupFn.process(MemGroupedTable.java:148) > at > org.apache.crunch.impl.mem.collect.MemGroupedTable$UngroupFn.process(MemGroupedTable.java:145) > at > org.apache.crunch.impl.mem.collect.MemCollection.parallelDo(MemCollection.java:155) > at > org.apache.crunch.impl.mem.collect.MemCollection.parallelDo(MemCollection.java:143) > at > org.apache.crunch.impl.mem.collect.MemGroupedTable.ungroup(MemGroupedTable.java:142) > > [2] > PGroupedTable<String, Item> itemsByPersonId = > AvroCollections.keyByAvroField(items, "person_id", > Avros.strings()).groupByKey(); > > PTable<String, Long> newestOrderByPersonId = > itemsByPersonId > .mapValues("Find latest order for each person", new > MaxOrderMapFn(), Avros.longs()); > > return itemsByPersonId.ungroup().join(newestOrderByPersonId).parallelDo( > "Filter old items, leaving only current order items", > new PreviousOrderItemFilter(), > Avros.tableOf(Avros.strings(), Avros.specifics(Item.class))); > > [3] https://issues.apache.org/jira/browse/CRUNCH-192 > https://github.com/apache/crunch/commit/cbc7c2fb30ad0486e7ec60656c079c81e41eda2c
