Hi community,

Here is my usecase:
My pipeline uses another kafka topic as a SideInputs, that contains the
filter criteria. Then when processing the mainstream, the pipeline is
trying to see if each message from mainstream matches *any *existing filter
criteria.

The sideInputs logic is: whenever seeing at least 1 element from sideInputs
topic -> fire -> and accumulate all the elements seen

Trigger trigger = Repeatedly.forever(
  AfterFirst.of(
    AfterPane.elementCountAtLeast(1),
    AfterProcessingTime.pastFirstElementInPane()
  ));

return kafkaValues.apply("sideInputFromTopic-" + topicName,

Window.<KV<String, T>>into(new GlobalWindows())
  .triggering(trigger)
  .accumulatingFiredPanes()
  .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
);


Here is my problem:
During application restarts, the 1st message from mainstream matches
the 3rd element in the sideInputs.
but since the pane will fire when the 1st element from sideInputs is
consumed, so it marked the 1st message
from mainstream as not pass filter.

Then I switched to the below trigger, which fires after 5 seconds
after reading 1 element from sideInputs.
This could workaround the application restarts problem, but for any
subsequent published elements in the sideInputs,
it also requires waiting for 5 seconds to fire, which could lead to
some messages from mainstream incorrectly being marked as not pass
filters.

Trigger trigger = Repeatedly.forever(
      AfterFirst.of(
        AfterPane.elementCountAtLeast(3000),
        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
      ));

Question:

I want to define a custom trigger that 1st time fires, it will wait
for say 5 - 10 seconds before processing the mainstream.

But afterwards, it fires as soon as it sees a new element from sideInputs.

Is this possible? Or should I be able to leverage existing API to do it?


Beam version: 2.23, Flink runner: 1.10.2

Thanks a lot!

Eleanore

Reply via email to