[ 
https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119952#comment-17119952
 ] 

John Roesler commented on KAFKA-10062:
--------------------------------------

Thanks [~psmolinski] ,

This seems like a reasonable feature, especially given what you noted about the 
TopologyTestDriver, and the relationship to punctuation in general.

We currently have:

org.apache.kafka.streams.processor.ProcessorContext#timestamp

to return the current record's timestamp.

If we're going to add a new method to get the system time, we should give it a 
name that's not ambiguous with the existing one. Plus, rather than passing in a 
PunctuationType to decide whether you want to know the system time or stream 
time, it seems more straightforward to just add two new ones.

Internally, we already have:

org.apache.kafka.streams.processor.internals.InternalProcessorContext#currentSystemTimeMs

It seems like we can just add to the public API:

org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs

and if you also want stream time:

org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs

> Add a method to retrieve the current timestamp as known by the Streams app
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-10062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10062
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Piotr Smolinski
>            Priority: Major
>              Labels: needs-kip
>
> Please add to the ProcessorContext a method to retrieve current timestamp 
> compatible with Punctuator#punctate(long) method.
> Proposal in ProcessorContext:
> long getTimestamp(PunctuationType type);
> The method should return time value as known by the Punctuator scheduler with 
> the respective PunctuationType.
> The use-case is tracking of a process with timeout-based escalation.
> A transformer receives process events and in case of missing an event execute 
> an action (emit message) after given escalation timeout (several stages). The 
> initial message may already arrive with reference timestamp in the past and 
> may trigger different action upon arrival depending on how far in the past it 
> is.
> If the timeout should be computed against some further time only, Punctuator 
> is perfectly sufficient. The problem is that I have to evaluate the current 
> time-related state once the message arrives.
> I am using wall-clock time. Normally accessing System.currentTimeMillis() is 
> sufficient, but it breaks in unit testing with TopologyTestDriver, where the 
> app wall clock time is different from the system-wide one.
> To access the mentioned clock I am using reflection to access 
> ProcessorContextImpl#task and then StreamTask#time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to