However this still appears to be a bug - that exception should never be thrown inside the Dataflow runner. Are you able to file a JIRA for this bug?
On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw <[email protected]> wrote: > Glad you were able to get this working; thanks for following up. > > On Mon, May 18, 2020 at 10:35 AM Mohil Khare <[email protected]> wrote: > >> Hi, >> On another note, I think I was unnecessarily complicating things by >> applying a sliding window here and then an extra global window to remove >> duplicates. I replaced the *sliding window with a session window *(*as >> I know that my transaction consisting of recordA logs and recordB logs for >> a key "MyKey" won't last for more than 60-90 secs*), and my use case >> seems to be working fine. Even DRAIN is working successfully. >> >> Thanks >> Mohil >> >> On Sun, May 17, 2020 at 3:37 PM Mohil Khare <[email protected]> wrote: >> >>> Hello, >>> >>> I have a use case where I have two sets of PCollections (RecordA and >>> RecordB) coming from a real time streaming source like Kafka. >>> >>> Both Records are correlated with a common key, let's say KEY. >>> >>> The purpose is to enrich RecordA with RecordB's data for which I am >>> using CoGbByKey. Since RecordA and RecordB for a common key can come within >>> 1-2 minutes of event time, I am maintaining a sliding window for both >>> records and then do CoGpByKey for both PCollections. >>> >>> The sliding windows that will find both RecordA and RecordB for a common >>> key KEY, will emit enriched output. Now, since multiple sliding windows can >>> emit the same output, I finally remove duplicate results by feeding >>> aforementioned outputs to a global window where I maintain a state to check >>> whether output has already been processed or not. Since it is a global >>> window, I maintain a Timer on state (for GC) to let it expire after 10 >>> minutes have elapsed since state has been written. >>> >>> This is working perfectly fine w.r.t the expected results. However, I am >>> unable to stop job gracefully i.e. Drain the job gracefully. I see >>> following exception: >>> >>> java.lang.IllegalStateException: >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received >>> state cleanup timer for window >>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is >>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z >>> >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) >>> java.lang.IllegalStateException: >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received >>> state cleanup timer for window >>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is >>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z >>> >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) >>> java.lang.IllegalStateException: >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received >>> state cleanup timer for window >>> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 >>> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z* >>> >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>> >>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) >>> >>> >>> My code snippet: >>> >>> PCollection<KV<MyKey, RecordA>> windowedRecordA = >>> incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", >>> Window.<KV<MyKey, >>> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >>> >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >>> >>> >>> PCollection<KV<MyKey, RecordB>> windowedRecordB = >>> recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, >>> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >>> >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >>> >>> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords = >>> KeyedPCollectionTuple.of(TagRecordA, windowedRecordA) >>> .and(TagRecordB, windowedRecordB) >>> .apply("CoGroupByKey", CoGroupByKey.create()); >>> >>> >>> PCollection<RecordA> enrichedRecordA = >>> coGbkRecords.apply("EnrichRecordAWithRecordB", >>> new EnrichIncompleteRecordA()); >>> >>> >>> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, >>> CoGbkResult>>, PCollection<RecordA>> { >>> @Override >>> public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> >>> input) { >>> logger.info("Enriching Incomplete RecordA with RecordB"); >>> return input >>> .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new >>> AddRecordBData())) >>> .apply("Applying_Windowing", Window.<KV<MyKey, >>> RecordA>>into(new GlobalWindows()) >>> >>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>> .discardingFiredPanes()) >>> .apply("Emit_Unique_RecordA", ParDo.of(new >>> EmitUniqueRecordA())); >>> >>> } >>> >>> private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, >>> KV<MyKey, RecordA>> { >>> @Setup >>> public void setup() { >>> } >>> >>> @StartBundle >>> public void startBundle() { >>> >>> } >>> >>> @ProcessElement >>> public void processElement(@Element KV<MyKey, CoGbkResult> input, >>> OutputReceiver<KV<MyKey, RecordA>> out) { >>> Iterable<RecordA> allRecordALogs = >>> input.getValue().getAll(TagRecordA); >>> Iterable<RecordB> allRecordBLogs = >>> input.getValue().getAll(TagRecordB); >>> >>> /* >>> There should be max 1 RecordB per MyKey >>> */ >>> if (allRecordALogs.iterator().hasNext() && >>> allRecordBLogs.iterator().hasNext()) { >>> RecordB recordB = Iterables.getFirst(allRecordBLogs, null); >>> for (RecordA recordA : allRecordALogs) { >>> if (null != recordB) { >>> logger.info("Enriching incomplete recordA [{}] with >>> recordB: [{}]", recordA, recordB); >>> <code to populate recordA object with recordB data> >>> >>> out.output(KV.of(input.getKey(), recordA)); >>> } else { >>> logger.error("No recordB available for recordA log >>> [{}]", recordA); >>> } >>> } >>> } else { >>> logger.info("Either recordA or recordB not present for >>> myKey: {}", input.getKey()); >>> } >>> } >>> >>> @FinishBundle >>> public void finishBundle() { >>> >>> } >>> >>> @Teardown >>> public void teardown() { >>> } >>> } >>> >>> >>> private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, >>> RecordA> { >>> @Setup >>> public void setup() { >>> } >>> >>> @StartBundle >>> public void startBundle() { >>> } >>> >>> @StateId("processedRecordA") >>> private final StateSpec<ValueState<RecordA> processedRecordASpec = >>> StateSpecs.value(AvroCoder.of(RecordA.class)); >>> >>> @TimerId("stateExpiry") >>> private final TimerSpec stateExpirySpec = >>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >>> >>> @ProcessElement >>> public void processElement(@Element KV<MyKey, RecordA> input, >>> OutputReceiver<RecordA> out, >>> @StateId("processedRecordA") >>> ValueState<Set<RecordA>> processedRecordAState, >>> @TimerId("stateExpiry") Timer >>> stateExpiryTimer) { >>> << code to check if recordA has already been processed by >>> checking state >> >>> if (recordA need to be emitted) { >>> processedRecordAState.write(processedRecordASet); >>> >>> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative(); >>> logger.info("Emitting unique recordA {} for myKey {}", >>> recordA, myKey); >>> out.output(input.getValue()); >>> } >>> } >>> >>> @OnTimer("stateExpiry") >>> public void onExpiry( >>> OnTimerContext context, >>> @StateId("processedRecordA") ValueState<RecordA> >>> processedRecordAState) { >>> logger.info("Expiring State after timer expiry"); >>> processedRecordAState.clear(); >>> } >>> >>> @FinishBundle >>> public void finishBundle() { >>> } >>> >>> @Teardown >>> public void teardown() { >>> } >>> } >>> } >>> >>> Any help or suggestion ?? >>> >>> Thanks and Regards >>> Mohil >>> >>
