Hi , We are using a few side Inputs access data that would be used in filtering out elements in the pipelines. The source of the side input data is redis instance in Google cloud memory store. The jobs are deployed using the generated pipeline templates via the Dataflow service APIs.
The side inputs are generated as below PCollectionView<Map<String, String>> targetCountryByJourneyId = p.apply("Generate a sequence for extracting resource_id->targetCountry map", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get())))) .apply("Get all rules from Redis for resourceId->targetCountry map", ParDo.of(new GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()), options.getRedisHost().get(), statsdHost, statsdPort))) .apply("Transform all Rules into journeyId->targetCountry map", ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort, Rule.TARGET_COUNTRY))) .apply("Window journeyId->targetCountry map", Window.<Map<String, String>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply("singleton view of journeyId -> targetCountry map", View.asSingleton()); and accessed as below PCollection<UserData> qualifiedUser = userValidatedEntry. apply("Eliminate user by target country", ParDo.of(new EliminateUserByTargetCountry( options.getProjectNameForBT().get(), options.getInstanceNameForBT().get(), bigTableAppProfileId, targetCountryByJourneyId, statsdHost, statsdPort) ) .withSideInputs(targetCountryByJourneyId)); The pipelines are being run on Google cloud Dataflow and We have been noticing 2 kinds of issues 1. Despite closing the window on the side input collection when the first element occurs, we are seeing this error. Strange thing is we don't see these errors occurring across all the workers in the job and we have also noticed this issue to get resolved during new deployments and only to resurface later. java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value at org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:434) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:524) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:493) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:2051) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360) at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96) at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43) at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137) at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125) at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2. Secondly, we have noticed the side input to not contain data even when the underlying data exists. This side input view being a map, when this occurs, we see empty keys and breaks our code while accessing it. Has anyone faced similar issues ? I would greatly help if someone can shed some light on this seemingly non-deterministic behavior of the side inputs when created as Singletons. -Thanks in advance Rajath