Glad you were able to get this working; thanks for following up. On Mon, May 18, 2020 at 10:35 AM Mohil Khare <[email protected]> wrote:
> Hi, > On another note, I think I was unnecessarily complicating things by > applying a sliding window here and then an extra global window to remove > duplicates. I replaced the *sliding window with a session window *(*as I > know that my transaction consisting of recordA logs and recordB logs for a > key "MyKey" won't last for more than 60-90 secs*), and my use case seems > to be working fine. Even DRAIN is working successfully. > > Thanks > Mohil > > On Sun, May 17, 2020 at 3:37 PM Mohil Khare <[email protected]> wrote: > >> Hello, >> >> I have a use case where I have two sets of PCollections (RecordA and >> RecordB) coming from a real time streaming source like Kafka. >> >> Both Records are correlated with a common key, let's say KEY. >> >> The purpose is to enrich RecordA with RecordB's data for which I am using >> CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 >> minutes of event time, I am maintaining a sliding window for both records >> and then do CoGpByKey for both PCollections. >> >> The sliding windows that will find both RecordA and RecordB for a common >> key KEY, will emit enriched output. Now, since multiple sliding windows can >> emit the same output, I finally remove duplicate results by feeding >> aforementioned outputs to a global window where I maintain a state to check >> whether output has already been processed or not. Since it is a global >> window, I maintain a Timer on state (for GC) to let it expire after 10 >> minutes have elapsed since state has been written. >> >> This is working perfectly fine w.r.t the expected results. However, I am >> unable to stop job gracefully i.e. Drain the job gracefully. I see >> following exception: >> >> java.lang.IllegalStateException: >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received >> state cleanup timer for window >> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is >> before the appropriate cleanup time 294248-01-24T04:00:54.776Z >> >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) >> java.lang.IllegalStateException: >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received >> state cleanup timer for window >> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is >> before the appropriate cleanup time 294248-01-24T04:00:54.776Z >> >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) >> java.lang.IllegalStateException: >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received >> state cleanup timer for window >> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 >> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z* >> >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >> >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >> >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >> >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) >> >> >> My code snippet: >> >> PCollection<KV<MyKey, RecordA>> windowedRecordA = >> incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", >> Window.<KV<MyKey, >> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >> >> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >> >> >> PCollection<KV<MyKey, RecordB>> windowedRecordB = >> recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, >> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >> >> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >> >> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords = >> KeyedPCollectionTuple.of(TagRecordA, windowedRecordA) >> .and(TagRecordB, windowedRecordB) >> .apply("CoGroupByKey", CoGroupByKey.create()); >> >> >> PCollection<RecordA> enrichedRecordA = >> coGbkRecords.apply("EnrichRecordAWithRecordB", >> new EnrichIncompleteRecordA()); >> >> >> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, >> CoGbkResult>>, PCollection<RecordA>> { >> @Override >> public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> >> input) { >> logger.info("Enriching Incomplete RecordA with RecordB"); >> return input >> .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new >> AddRecordBData())) >> .apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new >> GlobalWindows()) >> >> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >> .discardingFiredPanes()) >> .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA())); >> >> } >> >> private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, >> KV<MyKey, RecordA>> { >> @Setup >> public void setup() { >> } >> >> @StartBundle >> public void startBundle() { >> >> } >> >> @ProcessElement >> public void processElement(@Element KV<MyKey, CoGbkResult> input, >> OutputReceiver<KV<MyKey, RecordA>> out) { >> Iterable<RecordA> allRecordALogs = >> input.getValue().getAll(TagRecordA); >> Iterable<RecordB> allRecordBLogs = >> input.getValue().getAll(TagRecordB); >> >> /* >> There should be max 1 RecordB per MyKey >> */ >> if (allRecordALogs.iterator().hasNext() && >> allRecordBLogs.iterator().hasNext()) { >> RecordB recordB = Iterables.getFirst(allRecordBLogs, null); >> for (RecordA recordA : allRecordALogs) { >> if (null != recordB) { >> logger.info("Enriching incomplete recordA [{}] with >> recordB: [{}]", recordA, recordB); >> <code to populate recordA object with recordB data> >> >> out.output(KV.of(input.getKey(), recordA)); >> } else { >> logger.error("No recordB available for recordA log >> [{}]", recordA); >> } >> } >> } else { >> logger.info("Either recordA or recordB not present for >> myKey: {}", input.getKey()); >> } >> } >> >> @FinishBundle >> public void finishBundle() { >> >> } >> >> @Teardown >> public void teardown() { >> } >> } >> >> >> private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, >> RecordA> { >> @Setup >> public void setup() { >> } >> >> @StartBundle >> public void startBundle() { >> } >> >> @StateId("processedRecordA") >> private final StateSpec<ValueState<RecordA> processedRecordASpec = >> StateSpecs.value(AvroCoder.of(RecordA.class)); >> >> @TimerId("stateExpiry") >> private final TimerSpec stateExpirySpec = >> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >> >> @ProcessElement >> public void processElement(@Element KV<MyKey, RecordA> input, >> OutputReceiver<RecordA> out, >> @StateId("processedRecordA") >> ValueState<Set<RecordA>> processedRecordAState, >> @TimerId("stateExpiry") Timer >> stateExpiryTimer) { >> << code to check if recordA has already been processed by >> checking state >> >> if (recordA need to be emitted) { >> processedRecordAState.write(processedRecordASet); >> >> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative(); >> logger.info("Emitting unique recordA {} for myKey {}", >> recordA, myKey); >> out.output(input.getValue()); >> } >> } >> >> @OnTimer("stateExpiry") >> public void onExpiry( >> OnTimerContext context, >> @StateId("processedRecordA") ValueState<RecordA> >> processedRecordAState) { >> logger.info("Expiring State after timer expiry"); >> processedRecordAState.clear(); >> } >> >> @FinishBundle >> public void finishBundle() { >> } >> >> @Teardown >> public void teardown() { >> } >> } >> } >> >> Any help or suggestion ?? >> >> Thanks and Regards >> Mohil >> >
