[ https://issues.apache.org/jira/browse/KAFKA-12346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-12346: ------------------------------------ Description: A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed {code:java} .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() { override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] { override def init(context: ProcessorContext): Unit = { val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]] context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new Punctuator { override def punctuate(timestamp: Long): Unit = { logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}") } } ) } override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = { // no need to return anything here, the Punctuator will emit the records when necessary null } override def close(): Unit = {} } }, /** * register that this Transformer needs to be connected to our state store. */ stateStoreName ) {code} If the interval is specified as 1 second, the callback fires every 2 seconds. If the interval is specified as 5 seconds, the callback fires every 10 seconds. was: A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed {code:java} .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() { override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] { override def init(context: ProcessorContext): Unit = { val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]] context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new Punctuator { override def punctuate(timestamp: Long): Unit = { logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}") } } ) } override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = { // no need to return anything here, the Punctuator will emit the records when necessary null } override def close(): Unit = {} } }, /** * register that this Transformer needs to be connected to our state store. */ stateStoreName ) {code} > punctuate is called at twice the duration passed as the first argument to > Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME) > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-12346 > URL: https://issues.apache.org/jira/browse/KAFKA-12346 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0 > Reporter: Arindam Ray > Priority: Major > > A stream transform called with the idiom below causes punctuate to be called > at twice the duration of the argument passed > {code:java} > .transform(new TransformerSupplier[String, TimeStampedString, > KeyValue[String, TimeStampedString]]() { > override def get(): Transformer[String, TimeStampedString, > KeyValue[String, TimeStampedString]] = new Transformer[String, > TimeStampedString, KeyValue[String, TimeStampedString]] { override > def init(context: ProcessorContext): Unit = { > val store = > context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, > ValueAndTimestamp[TimeStampedString]]] > context.schedule(scanFrequency, > PunctuationType.WALL_CLOCK_TIME, > new Punctuator { > override def punctuate(timestamp: Long): Unit = { > logger.info(s"Punctuate invoked with timestamp : > ${Instant.ofEpochMilli(timestamp)}") > } > } > ) > } override def transform(key: String, value: > TimeStampedString): KeyValue[String, TimeStampedString] = { > // no need to return anything here, the Punctuator will emit > the records when necessary > null > } override def close(): Unit = {} > } > }, /** > * register that this Transformer needs to be connected to our > state store. > */ > stateStoreName > ) > {code} > If the interval is specified as 1 second, the callback fires every 2 seconds. > If the interval is specified as 5 seconds, the callback fires every 10 > seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)