Jon,

perhaps you could share the full integration test (or whatever code you're
using to experiment)?  We had a similar "how does X work?" question on
StackOverflow recently [1], and it was much easier to help once we e.g.
also understood how the test data was exactly being generated.

-Michael




[1]
https://stackoverflow.com/questions/43038653/how-to-actually-discard-late-records


On Thu, Mar 30, 2017 at 1:53 AM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> I remain more than mystified about the workings of the StateStore. I tried
> making aggregations with a 1minute window, 10 second advance and a _12
> hour_ retention (which is longer than the retention.ms of the topic).  I
> still couldn't get more than a 15% hit rate on the StateStore.
>
> Are there configuration settings? Some properties file to setup RocksDB? Im
> not getting any errors - just not getting any data.
>
> On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> > So '.until()' is based on clock time / elapsed time (IE record age) /
> > something else?
> >
> > The fact that Im seeing lots of records come through that can't be found
> > in the Store - these are 'old' and already expired?
> >
> > Going forward - it would be useful to have different forms of '.until()'
> > so one could consume old records (EG if one was catching up from lag)
> > without having to worry about them immediately disappearing.
> >
> > On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Jon,
> >>
> >> You should be able to query anything that has not expired, i.e., based
> on
> >> TimeWindows.until(..).
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yearg...@cedexis.com>
> >> wrote:
> >>
> >> > To be a bit more specific:
> >> >
> >> > If I call this:         KTable<Window, String> kt =
> >> > sourceStream.groupByKey().reduce(..., "somekeystore");
> >> >
> >> > and then call this:
> >> >
> >> > kt.forEach()-> ...
> >> >
> >> > Can I assume that everything that comes out will be available in
> >> > "somekeystore"? If not, what subset should I expect to find there?
> >> >
> >> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <
> jon.yearg...@cedexis.com
> >> >
> >> > wrote:
> >> >
> >> > > But if a key shows up in a KTable->forEach should it be available in
> >> the
> >> > > StateStore (from the KTable)?
> >> > >
> >> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <mich...@confluent.io
> >
> >> > > wrote:
> >> > >
> >> > >> Jon,
> >> > >>
> >> > >> there's a related example, using a window store and a key-value
> >> store,
> >> > at
> >> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> >> > >> streams/src/test/java/io/confluent/examples/streams/Val
> >> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> >> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
> >> > >>
> >> > >> -Michael
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
> >> jon.yearg...@cedexis.com
> >> > >
> >> > >> wrote:
> >> > >>
> >> > >> > Im only running one instance (locally) to keep things simple.
> >> > >> >
> >> > >> > Reduction:
> >> > >> >
> >> > >> >         KTable<Windowed<String>, String> hourAggStore =
> >> > >> > sourceStream.groupByKey().reduce(rowReducer,
> >> > >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60
> *
> >> > >> > 1000).until(70 * 60 * 1000L),
> >> > >> >                 "HourAggStore");
> >> > >> >
> >> > >> > then I get values to look for via:
> >> > >> >
> >> > >> >         hourAggStore.foreach((k, v) -> {
> >> > >> >                 LogLine logLine = objectMapper.readValue(v,
> >> > >> logLine.class);
> >> > >> >                 LOGGER.debug("{}", k.key());
> >> > >> >         });
> >> > >> >
> >> > >> > Ive kept it easy by requesting everything from 0 to
> >> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
> >> your
> >> > >> > sample code "windowedByKey".
> >> > >> >
> >> > >> > Requests are sent in via curl and output through the same
> channel.
> >> I
> >> > >> pass
> >> > >> > in the key and ask for any values.
> >> > >> >
> >> > >> > Ive looked at the values passed in / out of the reduction
> function
> >> and
> >> > >> they
> >> > >> > look sane.
> >> > >> >
> >> > >> > My assumption is that if a value shows up in the 'forEach' loop
> >> this
> >> > >> > implies it exists in the StateStore. Accurate?
> >> > >> >
> >> > >> > In fact, only about one in 10 requests actually return any
> values.
> >> No
> >> > >> > errors - just no data.
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <
> damian....@gmail.com>
> >> > >> wrote:
> >> > >> >
> >> > >> > > Hi Jon,
> >> > >> > >
> >> > >> > > If you are able to get a handle on the store, i.e., via
> >> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
> >> then
> >> > >> the
> >> > >> > > store is available.
> >> > >> > > The time params to fetch are the boundaries to search for
> windows
> >> > for
> >> > >> the
> >> > >> > > given key. They relate to the start time of the window, so if
> you
> >> > did
> >> > >> > > fetch(key, t1, t2) - it will find all the windows for key that
> >> start
> >> > >> in
> >> > >> > the
> >> > >> > > inclusive time range t1 - t2.
> >> > >> > >
> >> > >> > > Are you running more than one instance? If yes, then you want
> to
> >> > make
> >> > >> > sure
> >> > >> > > that you are querying the correct instance. For that you can
> use:
> >> > >> > > KafkaStreams.metadataForKey(...) to find the instance that has
> >> the
> >> > >> key
> >> > >> > you
> >> > >> > > are looking for.
> >> > >> > >
> >> > >> > > Thanks,
> >> > >> > > Damian
> >> > >> > >
> >> > >> > >
> >> > >> > >
> >> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
> >> jon.yearg...@cedexis.com
> >> > >
> >> > >> > > wrote:
> >> > >> > >
> >> > >> > > > Im probing about trying to find a way to solve my aggregation
> >> ->
> >> > db
> >> > >> > > issue.
> >> > >> > > > Looking at the '.fetch()'  function Im wondering about the
> >> > >> 'timeFrom'
> >> > >> > and
> >> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper'
> usage.
> >> > >> > > >
> >> > >> > > > The test in
> >> > >> > > >
> >> > >> > > > https://github.com/confluentinc/examples/blob/
> >> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/
> >> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> >> > >> > > mpleTest.java#L200-L212
> >> > >> > > > makes it appear that the params are boundaries and that it
> will
> >> > >> return
> >> > >> > an
> >> > >> > > > inclusive list of every key/window combination. Truth?
> >> > >> > > >
> >> > >> > > > My tests to this end haven't returned anything.
> >> > >> > > >
> >> > >> > > > Im watching the values coming out of the KTable<Window,
> String>
> >> > so I
> >> > >> > can
> >> > >> > > > send them back as request params. What Ive tried:
> >> > >> > > >
> >> > >> > > > - Window.key(), Window.key().start() and Window.key().end()
> >> > >> > > > - Window.key(), (Window.key().start() - 1) and
> >> (Window.key().end()
> >> > >> + 1)
> >> > >> > > > - Window.key(), 0 and Window.key().end()
> >> > >> > > > - Window.key(), 0 and (Window.key().end() + 1)
> >> > >> > > >
> >> > >> > > > None of these seem to hit anything in the StateStore.
> >> > >> > > >
> >> > >> > > > Is there a delay before Store values become available for
> >> > >> '.fetch()'?
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to