So a given window (with a '.until()' setting) is triggered for closing by the presence of a record outside the .until() setting?
If the timestamps for records jump about by a value larger than the .until value you could have windows being created / deleted quite a bit then? On Tue, Dec 13, 2016 at 9:57 AM, Matthias J. Sax <matth...@confluent.io> wrote: > First, windows are only created if there is actual data for a window. So > you get windows [0, 50), [25, 75), [50, 100) only if there are record > falling into each window (btw: window start-time is inclusive while > window end time is exclusive). If you have only 2 record with lets say > ts=20 and ts=90 you will not have an open window [25,75). Each window is > physically created each time the first record for it is processed. > > If you have above 4 windows and a record with ts=101 arrives, a new > window [101,151) will be created. Window [0,50) will not be deleted yet, > because retention is 100 and thus Streams guarantees that all record > with ts >= 1 (= 101 - 100) are still processed correctly and those > records would fall into window [0,50). > > Thus, window [0,50) can be dropped, if time advanced to TS = 150, but > not before that. > > -Matthias > > > On 12/13/16 12:06 AM, Sachin Mittal wrote: > > Hi, > > So is until for future or past? > > Say I get first record at t = 0 and until is 100 and my window size is 50 > > advance by 25. > > I understand it will create windows (0, 50), (25, 75), (50, 100) > > Now at t = 101 it will drop > > (0, 50), (25, 75), (50, 100) and create > > (101, 150), (125, 175), (150, 200) > > > > Please confirm if this understanding us correct. It is not clear how it > > will handle overlapping windows (75, 125) and (175, 225) and so on? > > > > What case is not clear again is that at say t = 102 I get some message > with > > timestamp 99. What happens then? > > Will the result added to previous aggregation of (50, 100) or (75, 125), > > like it should. > > > > Or it will recreate the old window (50, 100) and aggregate the value > there > > and then drop it. This would result is wrong aggregated value, as it does > > not consider the previous aggregated values. > > > > So this is the pressing case I am not able to understand. Maybe I am > wrong > > at some basic understanding. > > > > > > Next for > > The parameter > >> windowstore.changelog.additional.retention.ms > > > > How does this relate to rentention.ms param of topic config? > > I create internal topic manually using say rentention.ms=3600000. > > In next release (post kafka_2.10-0.10.0.1) since we support delete of > > internal changelog topic as well and I want it to be retained for say > just > > 1 hour. > > So how does that above parameter interfere with this topic level setting. > > Or now I just need to set above config as 3600000 and not add > > rentention.ms=3600000 > > while creating internal topic. > > This is just another doubt remaining here. > > > > Thanks > > Sachin > > > > > > > > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Sachin, > >> > >> There is no reason to have an .until() AND a .retain() -- just increase > >> the value of .until() > >> > >> If you have a window of let's say 1h size and you set .until() also to > >> 1h -- you can obviously not process any late arriving data. If you set > >> until() to 2h is this example, you can process data that is up to 1h > >> delayed. > >> > >> So basically, the retention should always be larger than you window > size. > >> > >> The parameter > >>> windowstore.changelog.additional.retention.ms > >> > >> is applies to changelog topics that backup window state stores. Those > >> changelog topics are compacted. However, the used key does encode an > >> window ID and thus older data can never be cleaned up by compaction. > >> Therefore, an additional retention time is applied to those topics, too. > >> Thus, if an old window is not updated for this amount of time, it will > >> get deleted eventually preventing this topic to grown infinitely. > >> > >> The value will be determined by until(), i.e., whatever you specify in > >> .until() will be used to set this parameter. > >> > >> > >> -Matthias > >> > >> On 12/12/16 1:07 AM, Sachin Mittal wrote: > >>> Hi, > >>> We are facing the exact problem as described by Matthias above. > >>> We are keeping default until which is 1 day. > >>> > >>> Our record's times tamp extractor has a field which increases with > time. > >>> However for short time we cannot guarantee the time stamp is always > >>> increases. So at the boundary ie after 24 hrs we can get records which > >> are > >>> beyond that windows retention period. > >>> > >>> Then it happens like it is mentioned above and our aggregation fails. > >>> > >>> So just to sum up when we get record > >>> 24h + 1 sec (it deletes older window and since the new record belongs > to > >>> the new window its gets created) > >>> Now when we get next record of 24 hs - 1 sec since older window is > >> dropped > >>> it does not get aggregated in that bucket. > >>> > >>> I suggest we have another setting next to until call retain which > retains > >>> the older windows into next window. > >>> > >>> I think at stream window boundary level it should use a concept of > >> sliding > >>> window. So we can define window like > >>> > >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * > >> 1000l).untill(7 > >>> * 24 * 3600 * 1000l).retain(900 * 1000l) > >>> > >>> So after 7 days it retains the data covered by windows in last 15 > minutes > >>> which rolls over the data in them to next window. This way streams work > >>> continuously. > >>> > >>> Please let us know your thoughts on this. > >>> > >>> On another side question on this there is a setting: > >>> > >>> windowstore.changelog.additional.retention.ms > >>> I is not clear what is does. Is this the default for until? > >>> > >>> Thanks > >>> Sachin > >>> > >>> > >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax < > matth...@confluent.io > >>> > >>> wrote: > >>> > >>>> Windows are created on demand, ie, each time a new record arrives and > >>>> there is no window yet for it, a new window will get created. > >>>> > >>>> Windows are accepting data until their retention time (that you can > >>>> configure via .until()) passed. Thus, you will have many windows being > >>>> open in parallel. > >>>> > >>>> If you read older data, they will just be put into the corresponding > >>>> windows (as long as window retention time did not pass). If a window > was > >>>> discarded already, a new window with this single (later arriving) > record > >>>> will get created, the computation will be triggered, you get a result, > >>>> and afterwards the window is deleted again (as it's retention time > >>>> passed already). > >>>> > >>>> The retention time is driven by "stream-time", in internal tracked > time > >>>> that only progressed in forward direction. It gets it value from the > >>>> timestamps provided by TimestampExtractor -- thus, per default it will > >>>> be event-time. > >>>> > >>>> -Matthias > >>>> > >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: > >>>>> I've read this and still have more questions than answers. If my data > >>>> skips > >>>>> about (timewise) what determines when a given window will start / > stop > >>>>> accepting new data? What if Im reading data from some time ago? > >>>>> > >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Please have a look here: > >>>>>> > >>>>>> http://docs.confluent.io/current/streams/developer- > >>>>>> guide.html#windowing-a-stream > >>>>>> > >>>>>> If you have further question, just follow up :) > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: > >>>>>>> Ive added the 'until()' clause to some aggregation steps and it's > >>>> working > >>>>>>> wonders for keeping the size of the state store in useful > >> boundaries... > >>>>>> But > >>>>>>> Im not 100% clear on how it works. > >>>>>>> > >>>>>>> What is implied by the '.until()' clause? What determines when to > >> stop > >>>>>>> receiving further data - is it clock time (since the window was > >>>> created)? > >>>>>>> It seems problematic for it to refer to EventTime as this may > bounce > >>>> all > >>>>>>> over the place. For non-overlapping windows a given record can only > >>>> fall > >>>>>>> into a single aggregation period - so when would a value get > >> discarded? > >>>>>>> > >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * > >>>>>> 1000L).until(10 * > >>>>>>> 1000L))' - but what is this accomplishing? > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >