Thanks a lot Luke, I will try it out! Eleanore
On Mon, Oct 18, 2021 at 9:41 AM Luke Cwik <[email protected]> wrote: > You could use a stateful DoFn and buffer the first message and everything > that you see for the first 5 seconds and then afterwards pass everything > through. Something like: > > processElement(...) { > if (value state is false) { > if (bag state is empty) { > schedule processing timer for 5 seconds from now > } > buffer element in bag state > } > output element > } > > onTimer(...) { > output everything in bag state buffer > set value state to true > } > > > > On Sun, Oct 17, 2021 at 2:31 PM Eleanore Jin <[email protected]> > wrote: > >> Hi community, >> >> Here is my usecase: >> My pipeline uses another kafka topic as a SideInputs, that contains the >> filter criteria. Then when processing the mainstream, the pipeline is >> trying to see if each message from mainstream matches *any *existing >> filter criteria. >> >> The sideInputs logic is: whenever seeing at least 1 element from >> sideInputs topic -> fire -> and accumulate all the elements seen >> >> Trigger trigger = Repeatedly.forever( >> AfterFirst.of( >> AfterPane.elementCountAtLeast(1), >> AfterProcessingTime.pastFirstElementInPane() >> )); >> >> return kafkaValues.apply("sideInputFromTopic-" + topicName, >> >> Window.<KV<String, T>>into(new GlobalWindows()) >> .triggering(trigger) >> .accumulatingFiredPanes() >> .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS) >> ); >> >> >> Here is my problem: >> During application restarts, the 1st message from mainstream matches the 3rd >> element in the sideInputs. >> but since the pane will fire when the 1st element from sideInputs is >> consumed, so it marked the 1st message >> from mainstream as not pass filter. >> >> Then I switched to the below trigger, which fires after 5 seconds after >> reading 1 element from sideInputs. >> This could workaround the application restarts problem, but for any >> subsequent published elements in the sideInputs, >> it also requires waiting for 5 seconds to fire, which could lead to some >> messages from mainstream incorrectly being marked as not pass filters. >> >> Trigger trigger = Repeatedly.forever( >> AfterFirst.of( >> AfterPane.elementCountAtLeast(3000), >> >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)) >> )); >> >> Question: >> >> I want to define a custom trigger that 1st time fires, it will wait for say >> 5 - 10 seconds before processing the mainstream. >> >> But afterwards, it fires as soon as it sees a new element from sideInputs. >> >> Is this possible? Or should I be able to leverage existing API to do it? >> >> >> Beam version: 2.23, Flink runner: 1.10.2 >> >> Thanks a lot! >> >> Eleanore >> >> >>
