ezhou413 commented on code in PR #22078:
URL: https://github.com/apache/kafka/pull/22078#discussion_r3133798934


##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka 
Streams instance of yo
 
 When the application instance starts running, the defined processor topology 
will be initialized as one or more stream tasks. If the processor topology 
defines any state stores, these are also constructed during the initialization 
period. For more information, see the State restoration during workload 
rebalance section).
 
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe 
the internal state and behavior of your application. All listeners must be set 
**before** calling `start()`. Attempting to set a listener after `start()` has 
been called will throw an `IllegalStateException`. Each listener can only be 
set once per `KafkaStreams` instance; setting a new listener replaces any 
previously set listener.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the 
`KafkaStreams` instance transitions between states. The possible states are: 
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`, 
`PENDING_ERROR`, and `ERROR`.
+
+    KafkaStreams streams = new KafkaStreams(topology, props);
+
+    streams.setStateListener((newState, oldState) -> {
+        if (newState == KafkaStreams.State.RUNNING) {
+            // application is now ready to process records
+        } else if (newState == KafkaStreams.State.ERROR) {
+            // application has encountered a fatal error
+        }
+    });
+
+    streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected 
exceptions thrown by internal stream threads. The handler receives the 
exception and must return a `StreamThreadExceptionResponse` indicating how to 
proceed:
+
+  * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+  * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+  * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.
+
+Example:
+
+    import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+    streams.setUncaughtExceptionHandler(exception -> {
+        if (exception instanceof RetriableException) {
+            return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
+        }
+        return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+    });
+
+The handler executes on the thread that produced the exception. Because the 
handler is shared across all stream threads, the implementation **must be 
thread-safe**. To get the thread that threw the exception, call 
`Thread.currentThread()` from within the handler.
+
+## State restore listener
+
+You can set a `StateRestoreListener` to be notified about the progress of 
state store restoration. This is useful for monitoring and logging the 
restoration process. The listener provides callbacks for the following 
lifecycle events:
+
+  * `onRestoreStart` -- Called when restoration begins for a state store 
partition, providing the starting and ending offsets.
+  * `onBatchRestored` -- Called after each batch of records is restored, 
providing the batch end offset and the number of records restored in the batch.
+  * `onRestoreEnd` -- Called when restoration completes for a state store 
partition, providing the total number of records restored.
+  * `onRestoreSuspended` -- Called when restoration is suspended because the 
task was migrated to another instance. This is a default method with an empty 
implementation.
+
+Example:
+
+    import org.apache.kafka.streams.processor.StateRestoreListener;
+
+    streams.setGlobalStateRestoreListener(new StateRestoreListener() {
+        @Override
+        public void onRestoreStart(TopicPartition topicPartition, String 
storeName,
+                                   long startingOffset, long endingOffset) {
+            // log that restoration has started
+        }
+
+        @Override
+        public void onBatchRestored(TopicPartition topicPartition, String 
storeName,
+                                    long batchEndOffset, long numRestored) {
+            // track progress
+        }
+
+        @Override
+        public void onRestoreEnd(TopicPartition topicPartition, String 
storeName,
+                                 long totalRestored) {
+            // log that restoration is complete
+        }
+    });
+
+The listener is shared across all `StreamThread` instances, so it should be 
**stateless** or properly synchronized. Note that this listener does **not** 
monitor standby task updates. To monitor standby tasks, use the standby update 
listener described below.
+
+## Standby update listener
+
+You can set a `StandbyUpdateListener` to be notified about updates to standby 
state store replicas. Standby replicas keep a copy of the state store on a 
different instance for faster failover. The listener provides callbacks for the 
following lifecycle events:
+
+  * `onUpdateStart` -- Called when a standby task begins consuming from the 
changelog, providing the starting offset.
+  * `onBatchLoaded` -- Called after each batch of records is loaded into the 
standby store, providing the batch end offset, batch size, and the current end 
offset of the changelog partition.
+  * `onUpdateSuspended` -- Called when the standby task stops updating. The 
`SuspendReason` parameter indicates why: `MIGRATED` means the task was moved to 
another instance, while `PROMOTED` means the standby was promoted to an active 
task (in which case the corresponding `StateRestoreListener.onRestoreStart` 
will be called next).
+
+Example:
+
+    import org.apache.kafka.streams.processor.StandbyUpdateListener;
+
+    streams.setStandbyUpdateListener(new StandbyUpdateListener() {
+        @Override
+        public void onUpdateStart(TopicPartition topicPartition, String 
storeName,
+                                  long startingOffset) {
+            // log that standby update has started
+        }
+
+        @Override
+        public void onBatchLoaded(TopicPartition topicPartition, String 
storeName,
+                                  TaskId taskId, long batchEndOffset,
+                                  long batchSize, long currentEndOffset) {
+            // track standby replication progress
+        }
+
+        @Override
+        public void onUpdateSuspended(TopicPartition topicPartition, String 
storeName,
+                                      long storeOffset, long currentEndOffset,
+                                      StandbyUpdateListener.SuspendReason 
reason) {
+            // log reason for suspension
+        }
+    });
+
+For more information about standby replicas, see [Standby 
Replicas](config-streams.html#num-standby-replicas).

Review Comment:
   Could you replace this link with the `.md` equivalent?



##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka 
Streams instance of yo
 
 When the application instance starts running, the defined processor topology 
will be initialized as one or more stream tasks. If the processor topology 
defines any state stores, these are also constructed during the initialization 
period. For more information, see the State restoration during workload 
rebalance section).
 
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe 
the internal state and behavior of your application. All listeners must be set 
**before** calling `start()`. Attempting to set a listener after `start()` has 
been called will throw an `IllegalStateException`. Each listener can only be 
set once per `KafkaStreams` instance; setting a new listener replaces any 
previously set listener.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the 
`KafkaStreams` instance transitions between states. The possible states are: 
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`, 
`PENDING_ERROR`, and `ERROR`.

Review Comment:
   Could you add a link to the javadoc for Kafka Streams states?



##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka 
Streams instance of yo
 
 When the application instance starts running, the defined processor topology 
will be initialized as one or more stream tasks. If the processor topology 
defines any state stores, these are also constructed during the initialization 
period. For more information, see the State restoration during workload 
rebalance section).
 
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe 
the internal state and behavior of your application. All listeners must be set 
**before** calling `start()`. Attempting to set a listener after `start()` has 
been called will throw an `IllegalStateException`. Each listener can only be 
set once per `KafkaStreams` instance; setting a new listener replaces any 
previously set listener.

Review Comment:
   Could you reword the last sentence? Each listener can be set any number of 
times before `start()` using `setStateListener`, so this sentence is a little 
confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to