Hi! This looks concerning. Can you show a full code example please? Does it run in direct runner?
On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <[email protected]> wrote: > Hi all, > > I'm trying to use ReplicateFn mentioned in this > <https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up a > nested for loop. The use case is exactly the same as "*Counting friends > in common (cross join by key)*" section. However, I have trouble to make > it work with beam 2.4.0 SDK. > > I'm implementing @SplitRestriction as follows: > > @SplitRestriction > public void splitRestriction(A element, OffsetRange range, > OutputReceiver<OffsetRange> out) { > for (final OffsetRange p : range.split(1000, 10)) { > out.output(p); > } > } > > Dataflow runner throws exception like this: > > java.util.NoSuchElementException > com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63) > com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) > com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308) > com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294) > com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97) > com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71) > com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61) > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323) > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43) > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48) > com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200) > com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158) > com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211) > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137) > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > > I also tried the following as suggested by the javadoc > <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html> > but > it has errors during pipeline construction. > > @SplitRestriction > public List<OffsetRange> splitRestriction(A element, OffsetRange range) { > return range.split(1000, 10); > } > > Without implementing @SplitRestriction, my pipeline can run without any > errors. However, I think the SDF is not really splitted by default, which > defeats the purpose of improving performance. > > Does anyone know if @SplitRestriction is currently supported by Dataflow > runner? How can I write a working version with the latest SDK? > > Thanks, > Jiayuan >
