Hi ,
   We are using a few side Inputs access data that would be used in
filtering out elements in the pipelines. The source of the side input data
is redis instance in Google cloud memory store.
The jobs are deployed using the generated pipeline templates via the
Dataflow service APIs.

The side inputs are generated as below
PCollectionView<Map<String, String>> targetCountryByJourneyId =
p.apply("Generate a sequence for extracting resource_id->targetCountry
map", GenerateSequence.from(0).withRate(1,

Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
        .apply("Get all rules from Redis for resourceId->targetCountry
map", ParDo.of(new
GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()),
options.getRedisHost().get(), statsdHost, statsdPort)))
        .apply("Transform all Rules into journeyId->targetCountry map",
ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort,
Rule.TARGET_COUNTRY)))
        .apply("Window journeyId->targetCountry map", Window.<Map<String,
String>>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .discardingFiredPanes())
        .apply("singleton view of journeyId -> targetCountry map",
View.asSingleton());

and accessed as below

PCollection<UserData> qualifiedUser = userValidatedEntry.
apply("Eliminate user by target country",
        ParDo.of(new EliminateUserByTargetCountry(
                options.getProjectNameForBT().get(),
                options.getInstanceNameForBT().get(),
                bigTableAppProfileId,
                targetCountryByJourneyId,
                statsdHost,
                statsdPort)
        )
                .withSideInputs(targetCountryByJourneyId));

The pipelines are being run on Google cloud Dataflow and
We have been noticing 2 kinds of issues
1. Despite closing the window on the side input collection when the first
element occurs, we are seeing this error. Strange thing is we don't see
these errors occurring across all the workers in the job and we have also
noticed this issue to get resolved during new deployments and only to
resurface later.

java.lang.IllegalArgumentException: PCollection with more than one element
accessed as a singleton view. Consider using
Combine.globally().asSingleton() to combine the PCollection into a single
value at
org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:434)
at
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:524)
at
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:493)
at
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:2051)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125)
at
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2. Secondly, we have noticed the side input to not contain data even when
the underlying data exists. This side input view being a map, when this
occurs, we see empty keys and breaks our code while accessing it.

Has anyone faced similar issues ? I would greatly help if someone can shed
some light on this seemingly non-deterministic behavior of the side inputs
when created as Singletons.

-Thanks in advance
Rajath

Reply via email to