[ https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-15448. ------------------------------------- Fix Version/s: 3.7.0 Assignee: Colt McNealy Resolution: Fixed > Streams StandbyTaskUpdateListener > --------------------------------- > > Key: KAFKA-15448 > URL: https://issues.apache.org/jira/browse/KAFKA-15448 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Colt McNealy > Assignee: Colt McNealy > Priority: Minor > Labels: kip > Fix For: 3.7.0 > > > KIP-869: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener] > In addition to the new metrics in KIP-869, it would be great to have a > callback that allows for monitoring of Standby Task status. The > StateRestoreListener is currently not called for Standby Tasks for good > reasons (the API wouldn't make sense for Standby). I've attached an interface > which would be nice to have: > > ``` > public interface StandbyTaskUpdateListener { > > public enum SuspendReason > { MIGRATED, PROMOTED; } > > /** > * Method called upon the creation of the Standby Task. > * > * @param topicPartition the TopicPartition of the Standby Task. > * @param storeName the name of the store being watched by this Standby Task. > * @param earliestOffset the earliest offset available on the Changelog topic. > * @param startingOffset the offset from which the Standby Task starts > watching. > * @param currentEndOffset the current latest offset on the associated > changelog partition. > */ > void onTaskCreated(final TopicPartition topicPartition, > final String storeName, > final long earliestOffset > final long startingOffset, > final long currentEndOffset); > > /** > * Method called after restoring a batch of records. In this case the maximum > size of the batch is whatever > * the value of the MAX_POLL_RECORDS is set to. > * > * This method is called after restoring each batch and it is advised to keep > processing to a minimum. > * Any heavy processing will hold up recovering the next batch, hence slowing > down the restore process as a > * whole. > * > * If you need to do any extended processing or connecting to an external > service consider doing so asynchronously. > * > * @param topicPartition the TopicPartition containing the values to restore > * @param storeName the name of the store undergoing restoration > * @param batchEndOffset the inclusive ending offset for the current restored > batch for this TopicPartition > * @param numRestored the total number of records restored in this batch for > this TopicPartition > * @param currentEndOffset the current end offset of the changelog topic > partition. > */ > void onBatchRestored(final TopicPartition topicPartition, > final String storeName, > final long batchEndOffset, > final long numRestored, > final long currentEndOffset); > > /** > * Method called after a Standby Task is closed, either because the task > migrated to a new instance or because the task was promoted to an Active task. > */ > void onTaskSuspended(final TopicPartition topicPartition, > final String storeName, > final long storeOffset, > final long currentEndOffset, > final SuspendReason reason); > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)