Jon, perhaps you could share the full integration test (or whatever code you're using to experiment)? We had a similar "how does X work?" question on StackOverflow recently [1], and it was much easier to help once we e.g. also understood how the test data was exactly being generated.
-Michael [1] https://stackoverflow.com/questions/43038653/how-to-actually-discard-late-records On Thu, Mar 30, 2017 at 1:53 AM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > I remain more than mystified about the workings of the StateStore. I tried > making aggregations with a 1minute window, 10 second advance and a _12 > hour_ retention (which is longer than the retention.ms of the topic). I > still couldn't get more than a 15% hit rate on the StateStore. > > Are there configuration settings? Some properties file to setup RocksDB? Im > not getting any errors - just not getting any data. > > On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > So '.until()' is based on clock time / elapsed time (IE record age) / > > something else? > > > > The fact that Im seeing lots of records come through that can't be found > > in the Store - these are 'old' and already expired? > > > > Going forward - it would be useful to have different forms of '.until()' > > so one could consume old records (EG if one was catching up from lag) > > without having to worry about them immediately disappearing. > > > > On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <damian....@gmail.com> > wrote: > > > >> Jon, > >> > >> You should be able to query anything that has not expired, i.e., based > on > >> TimeWindows.until(..). > >> > >> Thanks, > >> Damian > >> > >> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yearg...@cedexis.com> > >> wrote: > >> > >> > To be a bit more specific: > >> > > >> > If I call this: KTable<Window, String> kt = > >> > sourceStream.groupByKey().reduce(..., "somekeystore"); > >> > > >> > and then call this: > >> > > >> > kt.forEach()-> ... > >> > > >> > Can I assume that everything that comes out will be available in > >> > "somekeystore"? If not, what subset should I expect to find there? > >> > > >> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers < > jon.yearg...@cedexis.com > >> > > >> > wrote: > >> > > >> > > But if a key shows up in a KTable->forEach should it be available in > >> the > >> > > StateStore (from the KTable)? > >> > > > >> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mich...@confluent.io > > > >> > > wrote: > >> > > > >> > >> Jon, > >> > >> > >> > >> there's a related example, using a window store and a key-value > >> store, > >> > at > >> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka- > >> > >> streams/src/test/java/io/confluent/examples/streams/Val > >> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java > >> > >> (this is for Confluent 3.2 / Kafka 0.10.2). > >> > >> > >> > >> -Michael > >> > >> > >> > >> > >> > >> > >> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers < > >> jon.yearg...@cedexis.com > >> > > > >> > >> wrote: > >> > >> > >> > >> > Im only running one instance (locally) to keep things simple. > >> > >> > > >> > >> > Reduction: > >> > >> > > >> > >> > KTable<Windowed<String>, String> hourAggStore = > >> > >> > sourceStream.groupByKey().reduce(rowReducer, > >> > >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 > * > >> > >> > 1000).until(70 * 60 * 1000L), > >> > >> > "HourAggStore"); > >> > >> > > >> > >> > then I get values to look for via: > >> > >> > > >> > >> > hourAggStore.foreach((k, v) -> { > >> > >> > LogLine logLine = objectMapper.readValue(v, > >> > >> logLine.class); > >> > >> > LOGGER.debug("{}", k.key()); > >> > >> > }); > >> > >> > > >> > >> > Ive kept it easy by requesting everything from 0 to > >> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from > >> your > >> > >> > sample code "windowedByKey". > >> > >> > > >> > >> > Requests are sent in via curl and output through the same > channel. > >> I > >> > >> pass > >> > >> > in the key and ask for any values. > >> > >> > > >> > >> > Ive looked at the values passed in / out of the reduction > function > >> and > >> > >> they > >> > >> > look sane. > >> > >> > > >> > >> > My assumption is that if a value shows up in the 'forEach' loop > >> this > >> > >> > implies it exists in the StateStore. Accurate? > >> > >> > > >> > >> > In fact, only about one in 10 requests actually return any > values. > >> No > >> > >> > errors - just no data. > >> > >> > > >> > >> > > >> > >> > > >> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy < > damian....@gmail.com> > >> > >> wrote: > >> > >> > > >> > >> > > Hi Jon, > >> > >> > > > >> > >> > > If you are able to get a handle on the store, i.e., via > >> > >> > > KafkaStreams.store(...) and call fetch without any exceptions, > >> then > >> > >> the > >> > >> > > store is available. > >> > >> > > The time params to fetch are the boundaries to search for > windows > >> > for > >> > >> the > >> > >> > > given key. They relate to the start time of the window, so if > you > >> > did > >> > >> > > fetch(key, t1, t2) - it will find all the windows for key that > >> start > >> > >> in > >> > >> > the > >> > >> > > inclusive time range t1 - t2. > >> > >> > > > >> > >> > > Are you running more than one instance? If yes, then you want > to > >> > make > >> > >> > sure > >> > >> > > that you are querying the correct instance. For that you can > use: > >> > >> > > KafkaStreams.metadataForKey(...) to find the instance that has > >> the > >> > >> key > >> > >> > you > >> > >> > > are looking for. > >> > >> > > > >> > >> > > Thanks, > >> > >> > > Damian > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers < > >> jon.yearg...@cedexis.com > >> > > > >> > >> > > wrote: > >> > >> > > > >> > >> > > > Im probing about trying to find a way to solve my aggregation > >> -> > >> > db > >> > >> > > issue. > >> > >> > > > Looking at the '.fetch()' function Im wondering about the > >> > >> 'timeFrom' > >> > >> > and > >> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper' > usage. > >> > >> > > > > >> > >> > > > The test in > >> > >> > > > > >> > >> > > > https://github.com/confluentinc/examples/blob/ > >> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/ > >> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa > >> > >> > > mpleTest.java#L200-L212 > >> > >> > > > makes it appear that the params are boundaries and that it > will > >> > >> return > >> > >> > an > >> > >> > > > inclusive list of every key/window combination. Truth? > >> > >> > > > > >> > >> > > > My tests to this end haven't returned anything. > >> > >> > > > > >> > >> > > > Im watching the values coming out of the KTable<Window, > String> > >> > so I > >> > >> > can > >> > >> > > > send them back as request params. What Ive tried: > >> > >> > > > > >> > >> > > > - Window.key(), Window.key().start() and Window.key().end() > >> > >> > > > - Window.key(), (Window.key().start() - 1) and > >> (Window.key().end() > >> > >> + 1) > >> > >> > > > - Window.key(), 0 and Window.key().end() > >> > >> > > > - Window.key(), 0 and (Window.key().end() + 1) > >> > >> > > > > >> > >> > > > None of these seem to hit anything in the StateStore. > >> > >> > > > > >> > >> > > > Is there a delay before Store values become available for > >> > >> '.fetch()'? > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > > >> > > >> > > > > >