Thanks a lot Luke, I will try it out!

Eleanore

On Mon, Oct 18, 2021 at 9:41 AM Luke Cwik <[email protected]> wrote:

> You could use a stateful DoFn and buffer the first message and everything
> that you see for the first 5 seconds and then afterwards pass everything
> through. Something like:
>
> processElement(...) {
>   if (value state is false) {
>     if (bag state is empty) {
>       schedule processing timer for 5 seconds from now
>     }
>     buffer element in bag state
>   }
>   output element
> }
>
> onTimer(...) {
>   output everything in bag state buffer
>   set value state to true
> }
>
>
>
> On Sun, Oct 17, 2021 at 2:31 PM Eleanore Jin <[email protected]>
> wrote:
>
>> Hi community,
>>
>> Here is my usecase:
>> My pipeline uses another kafka topic as a SideInputs, that contains the
>> filter criteria. Then when processing the mainstream, the pipeline is
>> trying to see if each message from mainstream matches *any *existing
>> filter criteria.
>>
>> The sideInputs logic is: whenever seeing at least 1 element from
>> sideInputs topic -> fire -> and accumulate all the elements seen
>>
>> Trigger trigger = Repeatedly.forever(
>>   AfterFirst.of(
>>     AfterPane.elementCountAtLeast(1),
>>     AfterProcessingTime.pastFirstElementInPane()
>>   ));
>>
>> return kafkaValues.apply("sideInputFromTopic-" + topicName,
>>
>> Window.<KV<String, T>>into(new GlobalWindows())
>>   .triggering(trigger)
>>   .accumulatingFiredPanes()
>>   .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>> );
>>
>>
>> Here is my problem:
>> During application restarts, the 1st message from mainstream matches the 3rd 
>> element in the sideInputs.
>> but since the pane will fire when the 1st element from sideInputs is 
>> consumed, so it marked the 1st message
>> from mainstream as not pass filter.
>>
>> Then I switched to the below trigger, which fires after 5 seconds after 
>> reading 1 element from sideInputs.
>> This could workaround the application restarts problem, but for any 
>> subsequent published elements in the sideInputs,
>> it also requires waiting for 5 seconds to fire, which could lead to some 
>> messages from mainstream incorrectly being marked as not pass filters.
>>
>> Trigger trigger = Repeatedly.forever(
>>       AfterFirst.of(
>>         AfterPane.elementCountAtLeast(3000),
>>         
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
>>       ));
>>
>> Question:
>>
>> I want to define a custom trigger that 1st time fires, it will wait for say 
>> 5 - 10 seconds before processing the mainstream.
>>
>> But afterwards, it fires as soon as it sees a new element from sideInputs.
>>
>> Is this possible? Or should I be able to leverage existing API to do it?
>>
>>
>> Beam version: 2.23, Flink runner: 1.10.2
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>

Reply via email to