[ https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119952#comment-17119952 ]
John Roesler commented on KAFKA-10062: -------------------------------------- Thanks [~psmolinski] , This seems like a reasonable feature, especially given what you noted about the TopologyTestDriver, and the relationship to punctuation in general. We currently have: org.apache.kafka.streams.processor.ProcessorContext#timestamp to return the current record's timestamp. If we're going to add a new method to get the system time, we should give it a name that's not ambiguous with the existing one. Plus, rather than passing in a PunctuationType to decide whether you want to know the system time or stream time, it seems more straightforward to just add two new ones. Internally, we already have: org.apache.kafka.streams.processor.internals.InternalProcessorContext#currentSystemTimeMs It seems like we can just add to the public API: org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs and if you also want stream time: org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs > Add a method to retrieve the current timestamp as known by the Streams app > -------------------------------------------------------------------------- > > Key: KAFKA-10062 > URL: https://issues.apache.org/jira/browse/KAFKA-10062 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.5.0 > Reporter: Piotr Smolinski > Priority: Major > Labels: needs-kip > > Please add to the ProcessorContext a method to retrieve current timestamp > compatible with Punctuator#punctate(long) method. > Proposal in ProcessorContext: > long getTimestamp(PunctuationType type); > The method should return time value as known by the Punctuator scheduler with > the respective PunctuationType. > The use-case is tracking of a process with timeout-based escalation. > A transformer receives process events and in case of missing an event execute > an action (emit message) after given escalation timeout (several stages). The > initial message may already arrive with reference timestamp in the past and > may trigger different action upon arrival depending on how far in the past it > is. > If the timeout should be computed against some further time only, Punctuator > is perfectly sufficient. The problem is that I have to evaluate the current > time-related state once the message arrives. > I am using wall-clock time. Normally accessing System.currentTimeMillis() is > sufficient, but it breaks in unit testing with TopologyTestDriver, where the > app wall clock time is different from the system-wide one. > To access the mentioned clock I am using reflection to access > ProcessorContextImpl#task and then StreamTask#time. -- This message was sent by Atlassian Jira (v8.3.4#803005)