not sure that I can see how it is simpler. #2 is time window per day it emits the highest hour for that day. #4 is not a time window it keeps history of several days . if I want to put the logic of #2 I will need to manage it with timers, correct ?
On Thu, Dec 26, 2019 at 6:28 AM Kurt Young <ykt...@gmail.com> wrote: > *This Message originated outside your organization.* > ------------------------------ > Hi, > > You can merge the logic of #2 into #4, it will be much simpler. > > Best, > Kurt > > > On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Hi , >> >> I have the following pipeline : >> 1. single hour window that counts the number of records >> 2. single day window that accepts the aggregated data from #1 and emits >> the highest hour count of that day >> 3. union #1 + #2 >> 4. Logic operator that accepts the data from #3 and keep a listState of >> #2 and apply some logic on #1 based on that state (e.g comparing a single >> hour the history of the max hours at the last X days ) and emits the result >> >> the timestamsAndWaterMarks is >> using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow >> lateness of 3 hours >> >> the problem is that when I try to do unit tests of all the pipeline, the >> data from #1 rich #4 before the latter accepts the data from #3 hence it >> doesn't have any state yet (state is always empty when the stream from #1 >> arrives ). >> My source in the tests is a collection that represents the records. >> is there anyway I can solve this ? >> [image: Screen Shot 2019-12-25 at 13.04.17.png] >> I appreciate any help you can provide >> Cheers >> Avi >> >> >>