But if a key shows up in a KTable->forEach should it be available in the StateStore (from the KTable)?
On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mich...@confluent.io> wrote: > Jon, > > there's a related example, using a window store and a key-value store, at > https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/ > ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java > (this is for Confluent 3.2 / Kafka 0.10.2). > > -Michael > > > > On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > Im only running one instance (locally) to keep things simple. > > > > Reduction: > > > > KTable<Windowed<String>, String> hourAggStore = > > sourceStream.groupByKey().reduce(rowReducer, > > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * > > 1000).until(70 * 60 * 1000L), > > "HourAggStore"); > > > > then I get values to look for via: > > > > hourAggStore.foreach((k, v) -> { > > LogLine logLine = objectMapper.readValue(v, > logLine.class); > > LOGGER.debug("{}", k.key()); > > }); > > > > Ive kept it easy by requesting everything from 0 to > > 'System.currentTimeMillis()'. Retrieval is done using a snip from your > > sample code "windowedByKey". > > > > Requests are sent in via curl and output through the same channel. I pass > > in the key and ask for any values. > > > > Ive looked at the values passed in / out of the reduction function and > they > > look sane. > > > > My assumption is that if a value shows up in the 'forEach' loop this > > implies it exists in the StateStore. Accurate? > > > > In fact, only about one in 10 requests actually return any values. No > > errors - just no data. > > > > > > > > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian....@gmail.com> > wrote: > > > > > Hi Jon, > > > > > > If you are able to get a handle on the store, i.e., via > > > KafkaStreams.store(...) and call fetch without any exceptions, then the > > > store is available. > > > The time params to fetch are the boundaries to search for windows for > the > > > given key. They relate to the start time of the window, so if you did > > > fetch(key, t1, t2) - it will find all the windows for key that start in > > the > > > inclusive time range t1 - t2. > > > > > > Are you running more than one instance? If yes, then you want to make > > sure > > > that you are querying the correct instance. For that you can use: > > > KafkaStreams.metadataForKey(...) to find the instance that has the key > > you > > > are looking for. > > > > > > Thanks, > > > Damian > > > > > > > > > > > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jon.yearg...@cedexis.com> > > > wrote: > > > > > > > Im probing about trying to find a way to solve my aggregation -> db > > > issue. > > > > Looking at the '.fetch()' function Im wondering about the 'timeFrom' > > and > > > > 'timeTo' params as not a lot is mentioned about 'proper' usage. > > > > > > > > The test in > > > > > > > > https://github.com/confluentinc/examples/blob/ > > > master/kafka-streams/src/test/java/io/confluent/examples/ > > > streams/interactivequeries/WordCountInteractiveQueriesExa > > > mpleTest.java#L200-L212 > > > > makes it appear that the params are boundaries and that it will > return > > an > > > > inclusive list of every key/window combination. Truth? > > > > > > > > My tests to this end haven't returned anything. > > > > > > > > Im watching the values coming out of the KTable<Window, String> so I > > can > > > > send them back as request params. What Ive tried: > > > > > > > > - Window.key(), Window.key().start() and Window.key().end() > > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + > 1) > > > > - Window.key(), 0 and Window.key().end() > > > > - Window.key(), 0 and (Window.key().end() + 1) > > > > > > > > None of these seem to hit anything in the StateStore. > > > > > > > > Is there a delay before Store values become available for '.fetch()'? > > > > > > > > > >