Yes we are considering to differentiate "process" and "punctuate" function
to be "data-driven" and "time-driven" computations. That is, triggering of
punctuate should NOT be depending on the arrival of messages, or the
message's associated timestamps.

As for now I think periodically inserting the "time markers" into input
topics as suggested by David to make the current "data-driven" punctuate
function to be triggered is a good idea.

Guozhang

On Sat, Nov 26, 2016 at 1:39 PM, David Garcia <dav...@spiceworks.com> wrote:

> I know that the Kafka team is working on a new way to reason about time.
> My team's solution was to not use punctuate...but this only works if you
> have guarantees that all of the tasks will receive messages..if not all the
> partitions.  Another solution is to periodically send canaries to all
> partitions your app is listening to.  In either case it's a bandaid.  I
> know the team is aware of this bug and they are working on it.  Hopefully
> it will be addressed in 0.10.1.1
>
> Sent from my iPhone
>
> > On Nov 24, 2016, at 1:55 AM, shahab <shahab.mok...@gmail.com> wrote:
> >
> > Thanks for the comments.
> > @David: yes, I have a source which is reading data from two topics and
> one
> > of them were empty while the second one was loaded with plenty of data.
> > So what do you suggest to solve this ?
> > Here is snippet of my code:
> >
> > StreamsConfig config = new StreamsConfig(configProperties);
> > TopologyBuilder builder = new TopologyBuilder();
> > AppSettingsFetcher appSettingsFetcher = initAppSettingsFetcher();
> >
> > StateStoreSupplier company_bucket= Stores.create("CBS")
> >        .withKeys(Serdes.String())
> >        .withValues(Serdes.String())
> >        .persistent()
> >        .build();
> >
> > StateStoreSupplier profiles= Stores.create("PR")
> >        .withKeys(Serdes.String())
> >        .withValues(Serdes.String())
> >        .persistent()
> >        .build();
> >
> >
> > builder
> >        .addSource("deltaSource", topicName, LoaderListener.
> LoadedDeltaTopic)
> >
> >        .addProcessor("deltaProcess1", () -> new DeltaProcessor(
> >
> >        ), "deltaSource")
> >        .addProcessor("deltaProcess2", () -> new LoadProcessor(
> >
> >        ), "deltaProcess1")
> >        .addStateStore(profiles, "deltaProcess2", "deltaProcess1")
> >        .addStateStore(company_bucket, "deltaProcess2", "deltaProcess1");
> >
> > KafkaStreams streams = new KafkaStreams(builder, config);
> >
> > streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
> {
> >    @Override
> >    public void uncaughtException(Thread t, Throwable e) {
> >        e.printStackTrace();
> >    }
> > });
> >
> > streams.start();
> >
> >
> >> On Wed, Nov 23, 2016 at 8:30 PM, David Garcia <dav...@spiceworks.com>
> wrote:
> >>
> >> If you are consuming from more than one topic/partition, punctuate is
> >> triggered when the “smallest” time-value changes.  So, if there is a
> >> partition that doesn’t have any more messages on it, it will always have
> >> the smallest time-value and that time value won’t change…hence punctuate
> >> never gets called.
> >>
> >> -David
> >>
> >> On 11/23/16, 1:01 PM, "Matthias J. Sax" <matth...@confluent.io> wrote:
> >>
> >>    Your understanding is correct:
> >>
> >>    Punctuate is not triggered base on wall-clock time, but based in
> >>    internally tracked "stream time" that is derived from
> >> TimestampExtractor.
> >>    Even if you use WallclockTimestampExtractor, "stream time" is only
> >>    advance if there are input records.
> >>
> >>    Not sure why punctuate() is not triggered as you say that you do have
> >>    arriving data.
> >>
> >>    Can you share your code?
> >>
> >>
> >>
> >>    -Matthias
> >>
> >>
> >>>    On 11/23/16 4:48 AM, shahab wrote:
> >>> Hello,
> >>>
> >>> I am using low level processor and I set the context.schedule(10000),
> >>> assuming that punctuate() method is invoked every 10 sec .
> >>> I have set
> >>> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> >>> WallclockTimestampExtractor.class.getCanonicalName()) )
> >>>
> >>> Although data is keep coming to the topology (as I have logged the
> >> incoming
> >>> tuples to process() ),  punctuate() is never executed.
> >>>
> >>> What I am missing?
> >>>
> >>> best,
> >>> Shahab
> >>
> >>
> >>
> >>
>



-- 
-- Guozhang

Reply via email to