[jira] [Created] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-09-11 Thread Colt McNealy (Jira)
Colt McNealy created KAFKA-15448:


 Summary: Streams StandbyTaskUpdateListener
 Key: KAFKA-15448
 URL: https://issues.apache.org/jira/browse/KAFKA-15448
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Colt McNealy


In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason {
MIGRATED,
PROMOTED;
}
 
/**
* Method called upon the creation of the Standby Task.
*
* @param topicPartition the TopicPartition of the Standby Task.
* @param storeName the name of the store being watched by this Standby Task.
* @param earliestOffset the earliest offset available on the Changelog topic.
* @param startingOffset the offset from which the Standby Task starts watching.
* @param currentEndOffset the current latest offset on the associated changelog 
partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
* Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
* the value of the MAX_POLL_RECORDS is set to.
*
* This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
* Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
* whole.
*
* If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
* @param topicPartition the TopicPartition containing the values to restore
* @param storeName the name of the store undergoing restoration
* @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
* @param numRestored the total number of records restored in this batch for 
this TopicPartition
* @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
* Method called after a Standby Task is closed, either because the task 
migrated to a new instance or because the task was promoted to an Active task.
*/
void onTaskSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-04 Thread Colt McNealy (Jira)
Colt McNealy created KAFKA-15308:


 Summary: Wipe Stores upon OffsetOutOfRangeException in ALOS
 Key: KAFKA-15308
 URL: https://issues.apache.org/jira/browse/KAFKA-15308
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.0, 3.4.0, 3.3.0
Reporter: Colt McNealy


As per this [Confluent Community Slack 
Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
 Streams currently does not wipe away RocksDB state upon encountering an 
`OffsetOutOfRangeException` in ALOS.

 

`OffsetOutOfRangeException` is a rare case that occurs when a standby task 
requests offsets that no longer exist in the topic. We should wipe the store 
for three reasons:
 # Not wiping the store can be a violation of ALOS since some of the 
now-missing offsets could have contained tombstone records.
 # Wiping the store has no performance cost since we need to replay the 
entirety of what's in the changelog topic anyways.
 # I have heard (not yet confirmed myself) that we wipe the store in EOS 
anyways, so fixing this bug could remove a bit of complexity from supporting 
EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)