Does this fix the problem though? The docs indicate that new data is required 
for each *partition*, not topic. Overall I think the "stream time" notion is a 
good thing for a lot of use-cases, but some others definitely require 
wall-clock based windowing. Is something planned for this?

-Tommy

On Tue, 2017-03-28 at 10:45 +0100, Elliot Crosby-McCullough wrote:

Hi Michael,

My confusion was that the events are being created, transferred, and
received several seconds apart (longer than the punctuate schedule) with no
stalling because I'm triggering them by hand, so regardless of what
mechanism is being used for timing it should still be called.

That said, I've just noticed in the callout box that it will only advance
stream time if all input topics have new data which in my testing is not
the case, so I suppose I will need to attach the processor to each input
topic rather than processing them all at the same time (in this use case
they were being split back out in the processor).

Thanks,
Elliot

On 28 March 2017 at 10:18, Michael Noll 
<[email protected]<mailto:[email protected]>> wrote:



Elliot,

in the current API, `punctuate()` is called based on the current
stream-time (which defaults to event-time), not based on the current
wall-clock time / processing-time.  See http://docs.confluent.io/
current/streams/faq.html#why-is-punctuate-not-called.  The stream-time is
advanced only when new input records are coming in, so if there's e.g. a
stall on incoming records, then `punctuate()` will not be called.

If you need to schedule a call every N minutes of wall-clock time you'd
need to use your own scheduler.

Does that help?
Michael



On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough <
[email protected]<mailto:[email protected]>>
 wrote:



Hi there,

I've written a simple processor which expects to have #process called on


it


for each message and configures regular punctuate calls via
`context.schedule`.

Regardless of what configuration I try for timestamp extraction I cannot
get #punctuate to be called, despite #process being called for every
message (which are being sent several seconds apart).  I've set the
schedule as low as 1 (though the docs aren't clear whether that's micro,
milli, or just seconds) and tried both the wallclock time extractor and


the


default time extractor in both the global config and the state store


serde.



These particular messages are being generated by another kafka streams


DSL


application and I'm using kafka 0.10.2.0, so presumably they also have
automatically embedded timestamps.

I can't for the life of me figure out what's going on.  Could you clue me
in?

Thanks,
Elliot






--
[cid:[email protected]] Tommy Becker
Senior Software Engineer
O +1 919.460.4747
tivo.com<http://www.tivo.com/>

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to