not sure that I can see how it is simpler. #2 is time window per day it
emits the highest hour for that day. #4 is not a time window it keeps
history of several days . if I want to put the logic of #2 I will need to
manage it with timers, correct ?

On Thu, Dec 26, 2019 at 6:28 AM Kurt Young <ykt...@gmail.com> wrote:

> *This Message originated outside your organization.*
> ------------------------------
> Hi,
>
> You can merge the logic of #2 into #4, it will be much simpler.
>
> Best,
> Kurt
>
>
> On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>>  Hi ,
>>
>> I have the following pipeline :
>> 1. single hour window that counts the number of records
>> 2. single day window that accepts the aggregated data from #1 and emits
>> the highest hour count of that day
>> 3. union #1 + #2
>> 4. Logic operator that accepts the data from #3 and keep a listState of
>> #2 and apply some logic on #1 based on that state (e.g comparing a single
>> hour the history of the max hours at the last X days ) and emits the result
>>
>> the timestamsAndWaterMarks is
>> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
>> lateness of 3 hours
>>
>>  the problem is that when I try to do unit tests of all the pipeline, the
>> data from #1 rich #4 before the latter accepts the data from #3 hence it
>> doesn't have any state yet (state is always empty when the stream from #1
>> arrives ).
>> My source in the tests is a collection that represents the records.
>>  is there anyway I can solve this ?
>> [image: Screen Shot 2019-12-25 at 13.04.17.png]
>> I appreciate any help you can provide
>> Cheers
>> Avi
>>
>>
>>

Reply via email to