But if a key shows up in a KTable->forEach should it be available in the
StateStore (from the KTable)?

On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mich...@confluent.io> wrote:

> Jon,
>
> there's a related example, using a window store and a key-value store, at
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java
> (this is for Confluent 3.2 / Kafka 0.10.2).
>
> -Michael
>
>
>
> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> > Im only running one instance (locally) to keep things simple.
> >
> > Reduction:
> >
> >         KTable<Windowed<String>, String> hourAggStore =
> > sourceStream.groupByKey().reduce(rowReducer,
> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> > 1000).until(70 * 60 * 1000L),
> >                 "HourAggStore");
> >
> > then I get values to look for via:
> >
> >         hourAggStore.foreach((k, v) -> {
> >                 LogLine logLine = objectMapper.readValue(v,
> logLine.class);
> >                 LOGGER.debug("{}", k.key());
> >         });
> >
> > Ive kept it easy by requesting everything from 0 to
> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> > sample code "windowedByKey".
> >
> > Requests are sent in via curl and output through the same channel. I pass
> > in the key and ask for any values.
> >
> > Ive looked at the values passed in / out of the reduction function and
> they
> > look sane.
> >
> > My assumption is that if a value shows up in the 'forEach' loop this
> > implies it exists in the StateStore. Accurate?
> >
> > In fact, only about one in 10 requests actually return any values. No
> > errors - just no data.
> >
> >
> >
> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> > > Hi Jon,
> > >
> > > If you are able to get a handle on the store, i.e., via
> > > KafkaStreams.store(...) and call fetch without any exceptions, then the
> > > store is available.
> > > The time params to fetch are the boundaries to search for windows for
> the
> > > given key. They relate to the start time of the window, so if you did
> > > fetch(key, t1, t2) - it will find all the windows for key that start in
> > the
> > > inclusive time range t1 - t2.
> > >
> > > Are you running more than one instance? If yes, then you want to make
> > sure
> > > that you are querying the correct instance. For that you can use:
> > > KafkaStreams.metadataForKey(...) to find the instance that has the key
> > you
> > > are looking for.
> > >
> > > Thanks,
> > > Damian
> > >
> > >
> > >
> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jon.yearg...@cedexis.com>
> > > wrote:
> > >
> > > > Im probing about trying to find a way to solve my aggregation -> db
> > > issue.
> > > > Looking at the '.fetch()'  function Im wondering about the 'timeFrom'
> > and
> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> > > >
> > > > The test in
> > > >
> > > > https://github.com/confluentinc/examples/blob/
> > > master/kafka-streams/src/test/java/io/confluent/examples/
> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> > > mpleTest.java#L200-L212
> > > > makes it appear that the params are boundaries and that it will
> return
> > an
> > > > inclusive list of every key/window combination. Truth?
> > > >
> > > > My tests to this end haven't returned anything.
> > > >
> > > > Im watching the values coming out of the KTable<Window, String> so I
> > can
> > > > send them back as request params. What Ive tried:
> > > >
> > > > - Window.key(), Window.key().start() and Window.key().end()
> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() +
> 1)
> > > > - Window.key(), 0 and Window.key().end()
> > > > - Window.key(), 0 and (Window.key().end() + 1)
> > > >
> > > > None of these seem to hit anything in the StateStore.
> > > >
> > > > Is there a delay before Store values become available for '.fetch()'?
> > > >
> > >
> >
>

Reply via email to