It's based in "stream time", ie, the internally tracked progress based on the timestamps return by TimestampExtractor.
-Matthias On 3/29/17 12:52 PM, Jon Yeargers wrote: > So '.until()' is based on clock time / elapsed time (IE record age) / > something else? > > The fact that Im seeing lots of records come through that can't be found in > the Store - these are 'old' and already expired? > > Going forward - it would be useful to have different forms of '.until()' so > one could consume old records (EG if one was catching up from lag) without > having to worry about them immediately disappearing. > > On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <damian....@gmail.com> wrote: > >> Jon, >> >> You should be able to query anything that has not expired, i.e., based on >> TimeWindows.until(..). >> >> Thanks, >> Damian >> >> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> >>> To be a bit more specific: >>> >>> If I call this: KTable<Window, String> kt = >>> sourceStream.groupByKey().reduce(..., "somekeystore"); >>> >>> and then call this: >>> >>> kt.forEach()-> ... >>> >>> Can I assume that everything that comes out will be available in >>> "somekeystore"? If not, what subset should I expect to find there? >>> >>> On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yearg...@cedexis.com> >>> wrote: >>> >>>> But if a key shows up in a KTable->forEach should it be available in >> the >>>> StateStore (from the KTable)? >>>> >>>> On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mich...@confluent.io> >>>> wrote: >>>> >>>>> Jon, >>>>> >>>>> there's a related example, using a window store and a key-value store, >>> at >>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka- >>>>> streams/src/test/java/io/confluent/examples/streams/Val >>>>> idateStateWithInteractiveQueriesLambdaIntegrationTest.java >>>>> (this is for Confluent 3.2 / Kafka 0.10.2). >>>>> >>>>> -Michael >>>>> >>>>> >>>>> >>>>> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers < >> jon.yearg...@cedexis.com >>>> >>>>> wrote: >>>>> >>>>>> Im only running one instance (locally) to keep things simple. >>>>>> >>>>>> Reduction: >>>>>> >>>>>> KTable<Windowed<String>, String> hourAggStore = >>>>>> sourceStream.groupByKey().reduce(rowReducer, >>>>>> TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * >>>>>> 1000).until(70 * 60 * 1000L), >>>>>> "HourAggStore"); >>>>>> >>>>>> then I get values to look for via: >>>>>> >>>>>> hourAggStore.foreach((k, v) -> { >>>>>> LogLine logLine = objectMapper.readValue(v, >>>>> logLine.class); >>>>>> LOGGER.debug("{}", k.key()); >>>>>> }); >>>>>> >>>>>> Ive kept it easy by requesting everything from 0 to >>>>>> 'System.currentTimeMillis()'. Retrieval is done using a snip from >> your >>>>>> sample code "windowedByKey". >>>>>> >>>>>> Requests are sent in via curl and output through the same channel. I >>>>> pass >>>>>> in the key and ask for any values. >>>>>> >>>>>> Ive looked at the values passed in / out of the reduction function >> and >>>>> they >>>>>> look sane. >>>>>> >>>>>> My assumption is that if a value shows up in the 'forEach' loop this >>>>>> implies it exists in the StateStore. Accurate? >>>>>> >>>>>> In fact, only about one in 10 requests actually return any values. >> No >>>>>> errors - just no data. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian....@gmail.com> >>>>> wrote: >>>>>> >>>>>>> Hi Jon, >>>>>>> >>>>>>> If you are able to get a handle on the store, i.e., via >>>>>>> KafkaStreams.store(...) and call fetch without any exceptions, >> then >>>>> the >>>>>>> store is available. >>>>>>> The time params to fetch are the boundaries to search for windows >>> for >>>>> the >>>>>>> given key. They relate to the start time of the window, so if you >>> did >>>>>>> fetch(key, t1, t2) - it will find all the windows for key that >> start >>>>> in >>>>>> the >>>>>>> inclusive time range t1 - t2. >>>>>>> >>>>>>> Are you running more than one instance? If yes, then you want to >>> make >>>>>> sure >>>>>>> that you are querying the correct instance. For that you can use: >>>>>>> KafkaStreams.metadataForKey(...) to find the instance that has >> the >>>>> key >>>>>> you >>>>>>> are looking for. >>>>>>> >>>>>>> Thanks, >>>>>>> Damian >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 28 Mar 2017 at 22:37 Jon Yeargers < >> jon.yearg...@cedexis.com >>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Im probing about trying to find a way to solve my aggregation -> >>> db >>>>>>> issue. >>>>>>>> Looking at the '.fetch()' function Im wondering about the >>>>> 'timeFrom' >>>>>> and >>>>>>>> 'timeTo' params as not a lot is mentioned about 'proper' usage. >>>>>>>> >>>>>>>> The test in >>>>>>>> >>>>>>>> https://github.com/confluentinc/examples/blob/ >>>>>>> master/kafka-streams/src/test/java/io/confluent/examples/ >>>>>>> streams/interactivequeries/WordCountInteractiveQueriesExa >>>>>>> mpleTest.java#L200-L212 >>>>>>>> makes it appear that the params are boundaries and that it will >>>>> return >>>>>> an >>>>>>>> inclusive list of every key/window combination. Truth? >>>>>>>> >>>>>>>> My tests to this end haven't returned anything. >>>>>>>> >>>>>>>> Im watching the values coming out of the KTable<Window, String> >>> so I >>>>>> can >>>>>>>> send them back as request params. What Ive tried: >>>>>>>> >>>>>>>> - Window.key(), Window.key().start() and Window.key().end() >>>>>>>> - Window.key(), (Window.key().start() - 1) and >> (Window.key().end() >>>>> + 1) >>>>>>>> - Window.key(), 0 and Window.key().end() >>>>>>>> - Window.key(), 0 and (Window.key().end() + 1) >>>>>>>> >>>>>>>> None of these seem to hit anything in the StateStore. >>>>>>>> >>>>>>>> Is there a delay before Store values become available for >>>>> '.fetch()'? >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature