Hi! This looks concerning. Can you show a full code example please? Does it
run in direct runner?

On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <jiayuanm...@gmail.com> wrote:

> Hi all,
>
> I'm trying to use ReplicateFn mentioned in this
> <https://s.apache.org/splittable-do-fn> doc in my pipeline to speed up a
> nested for loop. The use case is exactly the same as "*Counting friends
> in common (cross join by key)*"  section. However, I have trouble to make
> it work with beam 2.4.0 SDK.
>
> I'm implementing @SplitRestriction as follows:
>
> @SplitRestriction
> public void splitRestriction(A element, OffsetRange range,
> OutputReceiver<OffsetRange> out) {
>   for (final OffsetRange p : range.split(1000, 10)) {
>      out.output(p);
>   }
> }
>
> Dataflow runner throws exception like this:
>
> java.util.NoSuchElementException
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
> com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97)
> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71)
> com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61)
> com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
> com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
> com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
> com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> I also tried the following as suggested by the javadoc
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/DoFn.SplitRestriction.html>
>  but
> it has errors during pipeline construction.
>
> @SplitRestriction
> public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
>   return range.split(1000, 10);
> }
>
> Without implementing @SplitRestriction, my pipeline can run without any
> errors. However, I think the SDF is not really splitted by default, which
> defeats the purpose of improving performance.
>
> Does anyone know if @SplitRestriction is currently supported by Dataflow
> runner? How can I write a working version with the latest SDK?
>
> Thanks,
> Jiayuan
>

Reply via email to