Does this fix the problem though? The docs indicate that new data is required for each *partition*, not topic. Overall I think the "stream time" notion is a good thing for a lot of use-cases, but some others definitely require wall-clock based windowing. Is something planned for this?
-Tommy On Tue, 2017-03-28 at 10:45 +0100, Elliot Crosby-McCullough wrote: Hi Michael, My confusion was that the events are being created, transferred, and received several seconds apart (longer than the punctuate schedule) with no stalling because I'm triggering them by hand, so regardless of what mechanism is being used for timing it should still be called. That said, I've just noticed in the callout box that it will only advance stream time if all input topics have new data which in my testing is not the case, so I suppose I will need to attach the processor to each input topic rather than processing them all at the same time (in this use case they were being split back out in the processor). Thanks, Elliot On 28 March 2017 at 10:18, Michael Noll <[email protected]<mailto:[email protected]>> wrote: Elliot, in the current API, `punctuate()` is called based on the current stream-time (which defaults to event-time), not based on the current wall-clock time / processing-time. See http://docs.confluent.io/ current/streams/faq.html#why-is-punctuate-not-called. The stream-time is advanced only when new input records are coming in, so if there's e.g. a stall on incoming records, then `punctuate()` will not be called. If you need to schedule a call every N minutes of wall-clock time you'd need to use your own scheduler. Does that help? Michael On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough < [email protected]<mailto:[email protected]>> wrote: Hi there, I've written a simple processor which expects to have #process called on it for each message and configures regular punctuate calls via `context.schedule`. Regardless of what configuration I try for timestamp extraction I cannot get #punctuate to be called, despite #process being called for every message (which are being sent several seconds apart). I've set the schedule as low as 1 (though the docs aren't clear whether that's micro, milli, or just seconds) and tried both the wallclock time extractor and the default time extractor in both the global config and the state store serde. These particular messages are being generated by another kafka streams DSL application and I'm using kafka 0.10.2.0, so presumably they also have automatically embedded timestamps. I can't for the life of me figure out what's going on. Could you clue me in? Thanks, Elliot -- [cid:[email protected]] Tommy Becker Senior Software Engineer O +1 919.460.4747 tivo.com<http://www.tivo.com/> ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
