Yes we are considering to differentiate "process" and "punctuate" function to be "data-driven" and "time-driven" computations. That is, triggering of punctuate should NOT be depending on the arrival of messages, or the message's associated timestamps.
As for now I think periodically inserting the "time markers" into input topics as suggested by David to make the current "data-driven" punctuate function to be triggered is a good idea. Guozhang On Sat, Nov 26, 2016 at 1:39 PM, David Garcia <dav...@spiceworks.com> wrote: > I know that the Kafka team is working on a new way to reason about time. > My team's solution was to not use punctuate...but this only works if you > have guarantees that all of the tasks will receive messages..if not all the > partitions. Another solution is to periodically send canaries to all > partitions your app is listening to. In either case it's a bandaid. I > know the team is aware of this bug and they are working on it. Hopefully > it will be addressed in 0.10.1.1 > > Sent from my iPhone > > > On Nov 24, 2016, at 1:55 AM, shahab <shahab.mok...@gmail.com> wrote: > > > > Thanks for the comments. > > @David: yes, I have a source which is reading data from two topics and > one > > of them were empty while the second one was loaded with plenty of data. > > So what do you suggest to solve this ? > > Here is snippet of my code: > > > > StreamsConfig config = new StreamsConfig(configProperties); > > TopologyBuilder builder = new TopologyBuilder(); > > AppSettingsFetcher appSettingsFetcher = initAppSettingsFetcher(); > > > > StateStoreSupplier company_bucket= Stores.create("CBS") > > .withKeys(Serdes.String()) > > .withValues(Serdes.String()) > > .persistent() > > .build(); > > > > StateStoreSupplier profiles= Stores.create("PR") > > .withKeys(Serdes.String()) > > .withValues(Serdes.String()) > > .persistent() > > .build(); > > > > > > builder > > .addSource("deltaSource", topicName, LoaderListener. > LoadedDeltaTopic) > > > > .addProcessor("deltaProcess1", () -> new DeltaProcessor( > > > > ), "deltaSource") > > .addProcessor("deltaProcess2", () -> new LoadProcessor( > > > > ), "deltaProcess1") > > .addStateStore(profiles, "deltaProcess2", "deltaProcess1") > > .addStateStore(company_bucket, "deltaProcess2", "deltaProcess1"); > > > > KafkaStreams streams = new KafkaStreams(builder, config); > > > > streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() > { > > @Override > > public void uncaughtException(Thread t, Throwable e) { > > e.printStackTrace(); > > } > > }); > > > > streams.start(); > > > > > >> On Wed, Nov 23, 2016 at 8:30 PM, David Garcia <dav...@spiceworks.com> > wrote: > >> > >> If you are consuming from more than one topic/partition, punctuate is > >> triggered when the “smallest” time-value changes. So, if there is a > >> partition that doesn’t have any more messages on it, it will always have > >> the smallest time-value and that time value won’t change…hence punctuate > >> never gets called. > >> > >> -David > >> > >> On 11/23/16, 1:01 PM, "Matthias J. Sax" <matth...@confluent.io> wrote: > >> > >> Your understanding is correct: > >> > >> Punctuate is not triggered base on wall-clock time, but based in > >> internally tracked "stream time" that is derived from > >> TimestampExtractor. > >> Even if you use WallclockTimestampExtractor, "stream time" is only > >> advance if there are input records. > >> > >> Not sure why punctuate() is not triggered as you say that you do have > >> arriving data. > >> > >> Can you share your code? > >> > >> > >> > >> -Matthias > >> > >> > >>> On 11/23/16 4:48 AM, shahab wrote: > >>> Hello, > >>> > >>> I am using low level processor and I set the context.schedule(10000), > >>> assuming that punctuate() method is invoked every 10 sec . > >>> I have set > >>> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > >>> WallclockTimestampExtractor.class.getCanonicalName()) ) > >>> > >>> Although data is keep coming to the topology (as I have logged the > >> incoming > >>> tuples to process() ), punctuate() is never executed. > >>> > >>> What I am missing? > >>> > >>> best, > >>> Shahab > >> > >> > >> > >> > -- -- Guozhang