minjcho commented on code in PR #22078:
URL: https://github.com/apache/kafka/pull/22078#discussion_r3367937225


##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka 
Streams instance of yo
 
 When the application instance starts running, the defined processor topology 
will be initialized as one or more stream tasks. If the processor topology 
defines any state stores, these are also constructed during the initialization 
period. For more information, see the State restoration during workload 
rebalance section).
 
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe 
the internal state and behavior of your application. All listeners must be set 
**before** calling `start()`. Attempting to set a listener after `start()` has 
been called will throw an `IllegalStateException`. A setter may be called 
multiple times before `start()`. Only the most recent listener takes effect.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the 
`KafkaStreams` instance transitions between states. The possible states are: 
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`, 
`PENDING_ERROR`, and `ERROR`. See the 
[`KafkaStreams.State`](/{version}/javadoc/org/apache/kafka/streams/KafkaStreams.State.html)
 javadocs for the meaning of each state and the allowed transitions.
+
+    KafkaStreams streams = new KafkaStreams(topology, props);
+
+    streams.setStateListener((newState, oldState) -> {
+        if (newState == KafkaStreams.State.RUNNING) {
+            // application is now ready to process records
+        } else if (newState == KafkaStreams.State.ERROR) {
+            // application has encountered a fatal error
+        }
+    });
+
+    streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected 
exceptions thrown by internal stream threads. The handler receives the 
exception and must return a `StreamThreadExceptionResponse` indicating how to 
proceed:
+
+  * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+  * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+  * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.

Review Comment:
   Good point — updated the bullet to call out that it's best-effort with no 
delivery guarantee.



##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka 
Streams instance of yo
 
 When the application instance starts running, the defined processor topology 
will be initialized as one or more stream tasks. If the processor topology 
defines any state stores, these are also constructed during the initialization 
period. For more information, see the State restoration during workload 
rebalance section).
 
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe 
the internal state and behavior of your application. All listeners must be set 
**before** calling `start()`. Attempting to set a listener after `start()` has 
been called will throw an `IllegalStateException`. A setter may be called 
multiple times before `start()`. Only the most recent listener takes effect.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the 
`KafkaStreams` instance transitions between states. The possible states are: 
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`, 
`PENDING_ERROR`, and `ERROR`. See the 
[`KafkaStreams.State`](/{version}/javadoc/org/apache/kafka/streams/KafkaStreams.State.html)
 javadocs for the meaning of each state and the allowed transitions.
+
+    KafkaStreams streams = new KafkaStreams(topology, props);
+
+    streams.setStateListener((newState, oldState) -> {
+        if (newState == KafkaStreams.State.RUNNING) {
+            // application is now ready to process records
+        } else if (newState == KafkaStreams.State.ERROR) {
+            // application has encountered a fatal error
+        }
+    });
+
+    streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected 
exceptions thrown by internal stream threads. The handler receives the 
exception and must return a `StreamThreadExceptionResponse` indicating how to 
proceed:
+
+  * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+  * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+  * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.
+
+Example:
+
+    import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+    streams.setUncaughtExceptionHandler(exception -> {
+        if (exception instanceof RetriableException) {
+            return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
+        }
+        return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+    });
+
+The handler executes on the thread that produced the exception. Because the 
handler is shared across all stream threads, the implementation **must be 
thread-safe**. To get the thread that threw the exception, call 
`Thread.currentThread()` from within the handler.
+
+## State restore listener
+
+You can set a `StateRestoreListener` to be notified about the progress of 
state store restoration. This is useful for monitoring and logging the 
restoration process. The listener provides callbacks for the following 
lifecycle events:
+
+  * `onRestoreStart` -- Called when restoration begins for a state store 
partition, providing the starting and ending offsets.
+  * `onBatchRestored` -- Called after each batch of records is restored, 
providing the batch end offset and the number of records restored in the batch.
+  * `onRestoreEnd` -- Called when restoration completes for a state store 
partition, providing the total number of records restored.
+  * `onRestoreSuspended` -- Called when restoration is suspended because the 
task was migrated to another instance. This is a default method with an empty 
implementation.

Review Comment:
   Agreed, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to