[jira] [Created] (KAFKA-10410) OnRestoreStart disappeared from StateRestoreCallback in 2.6.0 and reappeared in a useless place
Mark Shelton created KAFKA-10410: Summary: OnRestoreStart disappeared from StateRestoreCallback in 2.6.0 and reappeared in a useless place Key: KAFKA-10410 URL: https://issues.apache.org/jira/browse/KAFKA-10410 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Mark Shelton In version 2.5.0 and earlier there are "onRestoreStart" and "onRestoreEnd" methods on StateRestoreCallback. Version 2.6.0 removed these calls and put them into StateRestoreListener and requires "streaming.setGlobalStateRestoreListener". This makes it impossible for the actual StateRestoreCallback implementation to receive the start and end indication and is blocking me from moving to 2.6.0. See: [https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.html] Related JIRA: https://issues.apache.org/jira/browse/KAFKA-4322 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication
[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716086#comment-15716086 ] Mark Shelton commented on KAFKA-4322: - I do not at all like the idea of using the metrics API in order to be informed that my state has completed restoring. This API change only affects the low-level processor and only when using an own state provider so its pretty limited. The original developer of the API made a mistake in thinking that a row-by-row API is sufficient and not considering the need for a begin and end indication. > StateRestoreCallback begin and end indication > - > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Mark Shelton >Assignee: Mark Shelton >Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication
[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715221#comment-15715221 ] Mark Shelton commented on KAFKA-4322: - >> "We tend not to have such callbacks" The StateRestoreCallback already exists and is just being extended by "begin" and "end" indication calls. >> Would a listener word, e.g., you can register a listener and it's called >> upon restore completion (with number of records restored). Sure we can have yet another listener interface to implement. I don't quite understand what that would buy. >> We have a PR open that adds listeners for monitoring the state an instance >> is at. As long as it gets done. Please get it done. > StateRestoreCallback begin and end indication > - > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Mark Shelton >Assignee: Mark Shelton >Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication
[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687516#comment-15687516 ] Mark Shelton commented on KAFKA-4322: - The logging is the same for us and logging framework is not the concern. We know some domain-specific information about the keys and would like to, at the end of state restore, for example report how many keys of a given type were added and deleted. Having the begin and end callback makes this much easier as the begin callback can allocate the structure for statistics and the end callback can log the breakdown of keys added/removed per type, subtype etc.. My app has domain specific knowledge of keys that streams won't have. > StateRestoreCallback begin and end indication > - > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Mark Shelton >Assignee: Mark Shelton >Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication
[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671564#comment-15671564 ] Mark Shelton commented on KAFKA-4322: - I have a custom store and processor and if no information are available to the store and/or processor about whether or what was restored that impacts its usefulness. We are using our own logging. Relying on just the Kafka metrics or logging would be awkward or even insufficient. > StateRestoreCallback begin and end indication > - > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Mark Shelton >Assignee: Mark Shelton >Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication
[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15665194#comment-15665194 ] Mark Shelton commented on KAFKA-4322: - >> Is there any particular reasons that you want to commit offsets after >> restoration? This is the purpose of diagnosis, logging and recovery statistics only. This is to make it easier to tell me that N keys were restored since a begin and end callback allow me to log details and report progress and metrics regarding restoration. I do not want to commit offsets and the data that is made available is only for metrics. >> I'm wondering if you have other common requests that would benefit from the >> additional callbacks? Not currently no. >> If you feel that this is still a common feature that we should add to Kafka This is such a minor change with zero impact I don't see that one needs a KIP for that. > StateRestoreCallback begin and end indication > - > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Mark Shelton >Assignee: Mark Shelton >Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4394) Processor API does not receive timestamp and/or ConsumerRecord
[ https://issues.apache.org/jira/browse/KAFKA-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654916#comment-15654916 ] Mark Shelton commented on KAFKA-4394: - Great thank you for the pointer. This is sufficient to get the use case done. Closing issue. > Processor API does not receive timestamp and/or ConsumerRecord > --- > > Key: KAFKA-4394 > URL: https://issues.apache.org/jira/browse/KAFKA-4394 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Mark Shelton >Assignee: Guozhang Wang >Priority: Critical > > I'm trying to implement a custom low-level processor by implementing > org.apache.kafka.streams.processor.Processor or AbstractProcessor. > The "process(K key, V value)" process method only receives the key and value > and does not provide access to timestamp or other information from > ConsumerRecord. It is critical for me to get access to the message timestamp. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4394) Processor API does not receive timestamp and/or ConsumerRecord
[ https://issues.apache.org/jira/browse/KAFKA-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Shelton resolved KAFKA-4394. - Resolution: Not A Problem > Processor API does not receive timestamp and/or ConsumerRecord > --- > > Key: KAFKA-4394 > URL: https://issues.apache.org/jira/browse/KAFKA-4394 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Mark Shelton >Assignee: Guozhang Wang >Priority: Critical > > I'm trying to implement a custom low-level processor by implementing > org.apache.kafka.streams.processor.Processor or AbstractProcessor. > The "process(K key, V value)" process method only receives the key and value > and does not provide access to timestamp or other information from > ConsumerRecord. It is critical for me to get access to the message timestamp. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4394) Processor API does not receive timestamp and/or ConsumerRecord
Mark Shelton created KAFKA-4394: --- Summary: Processor API does not receive timestamp and/or ConsumerRecord Key: KAFKA-4394 URL: https://issues.apache.org/jira/browse/KAFKA-4394 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Mark Shelton Assignee: Guozhang Wang Priority: Critical I'm trying to implement a custom low-level processor by implementing org.apache.kafka.streams.processor.Processor or AbstractProcessor. The "process(K key, V value)" process method only receives the key and value and does not provide access to timestamp or other information from ConsumerRecord. It is critical for me to get access to the message timestamp. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4322) StateRestoreCallback begin and end indication
[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Shelton updated KAFKA-4322: Flags: Patch > StateRestoreCallback begin and end indication > - > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Mark Shelton >Assignee: Guozhang Wang >Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4324) NullPointerException in StreamTask.forward
[ https://issues.apache.org/jira/browse/KAFKA-4324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Shelton resolved KAFKA-4324. - Resolution: Invalid Marked as invalid. By accident my processor was singleton-like internally and the "context.forward" was invoking a another processor's context. > NullPointerException in StreamTask.forward > -- > > Key: KAFKA-4324 > URL: https://issues.apache.org/jira/browse/KAFKA-4324 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Mark Shelton >Assignee: Guozhang Wang >Priority: Minor > > I have a custom processor "MyProcessor extends AbstractProcessor Object>". The processor receives a message and transforms it and calls > "context.forward". This sometimes results in below exception. It seems to > occur when there are multiple messages available for processing. > Stack trace: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:341) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at xx.xx.xx.xx.xx.xx.xx.lambda$xx$1(xx.java:63) > > at xx.xx.xx.xx.xx.process(MyProcessor.java:16) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) > at > org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:343) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4324) NullPointerException in StreamTask.forward
Mark Shelton created KAFKA-4324: --- Summary: NullPointerException in StreamTask.forward Key: KAFKA-4324 URL: https://issues.apache.org/jira/browse/KAFKA-4324 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.1 Reporter: Mark Shelton Assignee: Guozhang Wang Priority: Minor I have a custom processor "MyProcessor extends AbstractProcessor". The processor receives a message and transforms it and calls "context.forward". This sometimes results in below exception. It seems to occur when there are multiple messages available for processing. Stack trace: java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:341) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at xx.xx.xx.xx.xx.xx.xx.lambda$xx$1(xx.java:63) at xx.xx.xx.xx.xx.process(MyProcessor.java:16) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:343) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4322) StateRestoreCallback begin and end indication
Mark Shelton created KAFKA-4322: --- Summary: StateRestoreCallback begin and end indication Key: KAFKA-4322 URL: https://issues.apache.org/jira/browse/KAFKA-4322 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Mark Shelton Assignee: Guozhang Wang Priority: Minor In Kafka Streams, the StateRestoreCallback interface provides only a single method "restore(byte[] key, byte[] value)" that is called for every key-value pair to be restored. It would be nice to have "beginRestore" and "endRestore" methods as part of StateRestoreCallback. Kafka Streams would call "beginRestore" before restoring any keys, and would call "endRestore" when it determines that it is done. This allows an implementation, for example, to report on the number of keys restored and perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)