However this still appears to be a bug - that exception should never be
thrown inside the Dataflow runner. Are you able to file a JIRA for this bug?

On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw <[email protected]>
wrote:

> Glad you were able to get this working; thanks for following up.
>
> On Mon, May 18, 2020 at 10:35 AM Mohil Khare <[email protected]> wrote:
>
>> Hi,
>> On another note, I think I was unnecessarily complicating things by
>> applying a sliding window here and then an extra global window to remove
>> duplicates.  I replaced the *sliding window with a session window *(*as
>> I know that my transaction consisting of recordA logs and recordB logs for
>> a key "MyKey" won't last for more than 60-90 secs*), and my use case
>> seems to be working fine. Even DRAIN is working successfully.
>>
>> Thanks
>> Mohil
>>
>> On Sun, May 17, 2020 at 3:37 PM Mohil Khare <[email protected]> wrote:
>>
>>> Hello,
>>>
>>> I have a use case where I have two sets of PCollections (RecordA and
>>> RecordB) coming from a real time streaming source like Kafka.
>>>
>>> Both Records are correlated with a common key, let's say KEY.
>>>
>>> The purpose is to enrich RecordA with RecordB's data for which I am
>>> using CoGbByKey. Since RecordA and RecordB for a common key can come within
>>> 1-2 minutes of event time, I am maintaining a sliding window for both
>>> records and then do CoGpByKey for both PCollections.
>>>
>>> The sliding windows that will find both RecordA and RecordB for a common
>>> key KEY, will emit enriched output. Now, since multiple sliding windows can
>>> emit the same output, I finally remove duplicate results by feeding
>>> aforementioned outputs to a global window where I maintain a state to check
>>> whether output has already been processed or not. Since it is a global
>>> window, I maintain a Timer on state (for GC) to let it expire after 10
>>> minutes have elapsed since state has been written.
>>>
>>> This is working perfectly fine w.r.t the expected results. However, I am
>>> unable to stop job gracefully i.e. Drain the job gracefully. I see
>>> following exception:
>>>
>>> java.lang.IllegalStateException:
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received
>>> state cleanup timer for window
>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
>>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>>
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> java.lang.Thread.run(Thread.java:745)
>>> java.lang.IllegalStateException:
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received
>>> state cleanup timer for window
>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
>>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>>
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> java.lang.Thread.run(Thread.java:745)
>>> java.lang.IllegalStateException:
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received
>>> state cleanup timer for window 
>>> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210
>>> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
>>>
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>
>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> My code snippet:
>>>
>>> PCollection<KV<MyKey, RecordA>> windowedRecordA =
>>>     incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", 
>>> Window.<KV<MyKey, 
>>> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>>>         
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
>>>
>>>
>>> PCollection<KV<MyKey, RecordB>> windowedRecordB =
>>>     recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, 
>>> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>>>         
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
>>>
>>> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords =
>>>     KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
>>>         .and(TagRecordB, windowedRecordB)
>>>         .apply("CoGroupByKey", CoGroupByKey.create());
>>>
>>>
>>> PCollection<RecordA> enrichedRecordA =
>>>     coGbkRecords.apply("EnrichRecordAWithRecordB",
>>>         new EnrichIncompleteRecordA());
>>>
>>>
>>> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, 
>>> CoGbkResult>>, PCollection<RecordA>> {
>>>     @Override
>>>     public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> 
>>> input) {
>>>         logger.info("Enriching Incomplete RecordA with RecordB");
>>>         return input
>>>             .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new 
>>> AddRecordBData()))
>>>             .apply("Applying_Windowing", Window.<KV<MyKey, 
>>> RecordA>>into(new GlobalWindows())
>>>                 
>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>>                 .discardingFiredPanes())
>>>             .apply("Emit_Unique_RecordA", ParDo.of(new 
>>> EmitUniqueRecordA()));
>>>
>>>     }
>>>
>>>     private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, 
>>> KV<MyKey, RecordA>> {
>>>         @Setup
>>>         public void setup() {
>>>         }
>>>
>>>         @StartBundle
>>>         public void startBundle() {
>>>
>>>         }
>>>
>>>         @ProcessElement
>>>         public void processElement(@Element KV<MyKey, CoGbkResult> input, 
>>> OutputReceiver<KV<MyKey, RecordA>> out) {
>>>             Iterable<RecordA> allRecordALogs = 
>>> input.getValue().getAll(TagRecordA);
>>>             Iterable<RecordB> allRecordBLogs = 
>>> input.getValue().getAll(TagRecordB);
>>>
>>>             /*
>>>               There should be max 1 RecordB per MyKey
>>>              */
>>>             if (allRecordALogs.iterator().hasNext() && 
>>> allRecordBLogs.iterator().hasNext()) {
>>>                 RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
>>>                 for (RecordA recordA : allRecordALogs) {
>>>                     if (null != recordB) {
>>>                         logger.info("Enriching incomplete recordA [{}] with 
>>> recordB: [{}]", recordA, recordB);
>>>                          <code to populate recordA object with recordB data>
>>>
>>>                         out.output(KV.of(input.getKey(), recordA));
>>>                     } else {
>>>                         logger.error("No recordB available for recordA log 
>>> [{}]", recordA);
>>>                     }
>>>                 }
>>>             } else {
>>>                 logger.info("Either recordA or recordB not present for 
>>> myKey: {}", input.getKey());
>>>             }
>>>         }
>>>
>>>         @FinishBundle
>>>         public void finishBundle() {
>>>
>>>         }
>>>
>>>         @Teardown
>>>         public void teardown() {
>>>         }
>>>     }
>>>
>>>
>>>     private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, 
>>> RecordA> {
>>>         @Setup
>>>         public void setup() {
>>>         }
>>>
>>>         @StartBundle
>>>         public void startBundle() {
>>>         }
>>>
>>>         @StateId("processedRecordA")
>>>         private final StateSpec<ValueState<RecordA> processedRecordASpec = 
>>> StateSpecs.value(AvroCoder.of(RecordA.class));
>>>
>>>         @TimerId("stateExpiry")
>>>         private final TimerSpec stateExpirySpec = 
>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>>
>>>         @ProcessElement
>>>         public void processElement(@Element KV<MyKey, RecordA> input, 
>>> OutputReceiver<RecordA> out,
>>>                                    @StateId("processedRecordA") 
>>> ValueState<Set<RecordA>> processedRecordAState,
>>>                                    @TimerId("stateExpiry") Timer 
>>> stateExpiryTimer) {
>>>             << code to check if recordA has already been processed by 
>>> checking state >>
>>>             if (recordA need to be emitted) {
>>>                 processedRecordAState.write(processedRecordASet);
>>>                 
>>> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
>>>                 logger.info("Emitting unique recordA {} for myKey {}", 
>>> recordA, myKey);
>>>                 out.output(input.getValue());
>>>             }
>>>         }
>>>
>>>         @OnTimer("stateExpiry")
>>>         public void onExpiry(
>>>             OnTimerContext context,
>>>             @StateId("processedRecordA") ValueState<RecordA> 
>>> processedRecordAState) {
>>>             logger.info("Expiring State after timer expiry");
>>>             processedRecordAState.clear();
>>>         }
>>>
>>>         @FinishBundle
>>>         public void finishBundle() {
>>>         }
>>>
>>>         @Teardown
>>>         public void teardown() {
>>>         }
>>>     }
>>> }
>>>
>>> Any help or suggestion ??
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>

Reply via email to