Im only running one instance (locally) to keep things simple.

Reduction:

        KTable<Windowed<String>, String> hourAggStore =
sourceStream.groupByKey().reduce(rowReducer,
                TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
1000).until(70 * 60 * 1000L),
                "HourAggStore");

then I get values to look for via:

        hourAggStore.foreach((k, v) -> {
                LogLine logLine = objectMapper.readValue(v, logLine.class);
                LOGGER.debug("{}", k.key());
        });

Ive kept it easy by requesting everything from 0 to
'System.currentTimeMillis()'. Retrieval is done using a snip from your
sample code "windowedByKey".

Requests are sent in via curl and output through the same channel. I pass
in the key and ask for any values.

Ive looked at the values passed in / out of the reduction function and they
look sane.

My assumption is that if a value shows up in the 'forEach' loop this
implies it exists in the StateStore. Accurate?

In fact, only about one in 10 requests actually return any values. No
errors - just no data.



On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Jon,
>
> If you are able to get a handle on the store, i.e., via
> KafkaStreams.store(...) and call fetch without any exceptions, then the
> store is available.
> The time params to fetch are the boundaries to search for windows for the
> given key. They relate to the start time of the window, so if you did
> fetch(key, t1, t2) - it will find all the windows for key that start in the
> inclusive time range t1 - t2.
>
> Are you running more than one instance? If yes, then you want to make sure
> that you are querying the correct instance. For that you can use:
> KafkaStreams.metadataForKey(...) to find the instance that has the key you
> are looking for.
>
> Thanks,
> Damian
>
>
>
> On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> > Im probing about trying to find a way to solve my aggregation -> db
> issue.
> > Looking at the '.fetch()'  function Im wondering about the 'timeFrom' and
> > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> >
> > The test in
> >
> > https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/
> streams/interactivequeries/WordCountInteractiveQueriesExa
> mpleTest.java#L200-L212
> > makes it appear that the params are boundaries and that it will return an
> > inclusive list of every key/window combination. Truth?
> >
> > My tests to this end haven't returned anything.
> >
> > Im watching the values coming out of the KTable<Window, String> so I can
> > send them back as request params. What Ive tried:
> >
> > - Window.key(), Window.key().start() and Window.key().end()
> > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
> > - Window.key(), 0 and Window.key().end()
> > - Window.key(), 0 and (Window.key().end() + 1)
> >
> > None of these seem to hit anything in the StateStore.
> >
> > Is there a delay before Store values become available for '.fetch()'?
> >
>

Reply via email to