Hi Siyu, Thank you for your response. I‘m aware of that. The thing is… even if it runs forever the trigger should still fire every period if I’m not mistaken.
So I still do not understand why the main-stream blocks the processing after the filter step and the enrich step gets never executed. I would think that the trigger for the side input should provide a value to the enrich step so that it can do its thing but it seems to wait for the side input for whatever reason. Or do I understand something wrong? Thank you very much and regards Johannes On Mon 13. Sep 2021 at 22:49, Siyu Lin <[email protected]> wrote: > Hi Johannes, > > L20 in your code will make it run forever so it is better to put a finite > number there, like > `GenerateSequence.from(0).to(1).withRate(1, Duration.standardSeconds(2))` > > -siyu > > > On Sep 10, 2021, at 6:06 AM, Johannes Frey <[email protected]> wrote: > > > > Hi everybody, > > > > I'm currently having a hard time wrapping my head around streaming > > data processing. > > > > Szenario: > > I have a main stream of data that is going to be processed (orders > > entering the system) and at some point in the pipeline in one > > processing step I need a side input to enrich the processed data. > > > > This side input should update itself once a day and is created using > > the example in the documentation > > https://beam.apache.org/documentation/patterns/side-inputs/ section: > > "Slowly updating global window side inputs" > > > > To make sure that everything works as expected I would like to write a > > Junit test to make sure the side input updates regularly and the new > > arriving data indeed gets the updated values from the side input. > > > > Here is the code for what I am trying to do: > https://pastebin.com/8mtPKTcv > > > > The result I'm getting is that: > > - The SideInput is triggered > > - Processing starts > > - all elements get processed up to the point of the processing step > > that actually needs the side input then it blocks > > - sideInput gets triggered again > > - processing still blocked > > > > Could anyone please explain to me where I'm wrong. I already tried > > several things like introducing fixed windows to the main-stream but > > no luck so far... also I couldn't find much information using google. > > > > I also printed the ctx.timestamp() of the processings here is how they > look: > > 2021-09-10T13:03:10.021Z sideloaded elements: 1767 > > 2021-09-10T13:03:07.072Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:07.073Z Filter called > > 2021-09-10T13:03:30.022Z sideloaded elements: 1767 > > > > Your help would be really appreciated. > > > > Thanks and regards > >
