Hi everybody,

I'm currently having a hard time wrapping my head around streaming
data processing.

Szenario:
I have a main stream of data that is going to be processed (orders
entering the system) and at some point in the pipeline in one
processing step I need a side input to enrich the processed data.

This side input should update itself once a day and is created using
the example in the documentation
https://beam.apache.org/documentation/patterns/side-inputs/ section:
"Slowly updating global window side inputs"

To make sure that everything works as expected I would like to write a
Junit test to make sure the side input updates regularly and the new
arriving data indeed gets the updated values from the side input.

Here is the code for what I am trying to do: https://pastebin.com/8mtPKTcv

The result I'm getting is that:
- The SideInput is triggered
- Processing starts
- all elements get processed up to the point of the processing step
that actually needs the side input then it blocks
- sideInput gets triggered again
- processing still blocked

Could anyone please explain to me where I'm wrong. I already tried
several things like introducing fixed windows to the main-stream but
no luck so far... also I couldn't find much information using google.

I also printed the ctx.timestamp() of the processings here is how they look:
2021-09-10T13:03:10.021Z sideloaded elements: 1767
2021-09-10T13:03:07.072Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:30.022Z sideloaded elements: 1767

Your help would be really appreciated.

Thanks and regards

Reply via email to