mjsax commented on code in PR #22078:
URL: https://github.com/apache/kafka/pull/22078#discussion_r3307984788
##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka
Streams instance of yo
When the application instance starts running, the defined processor topology
will be initialized as one or more stream tasks. If the processor topology
defines any state stores, these are also constructed during the initialization
period. For more information, see the State restoration during workload
rebalance section).
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe
the internal state and behavior of your application. All listeners must be set
**before** calling `start()`. Attempting to set a listener after `start()` has
been called will throw an `IllegalStateException`. A setter may be called
multiple times before `start()`. Only the most recent listener takes effect.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the
`KafkaStreams` instance transitions between states. The possible states are:
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`,
`PENDING_ERROR`, and `ERROR`. See the
[`KafkaStreams.State`](/{version}/javadoc/org/apache/kafka/streams/KafkaStreams.State.html)
javadocs for the meaning of each state and the allowed transitions.
+
+ KafkaStreams streams = new KafkaStreams(topology, props);
+
+ streams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING) {
+ // application is now ready to process records
+ } else if (newState == KafkaStreams.State.ERROR) {
+ // application has encountered a fatal error
+ }
+ });
+
+ streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected
exceptions thrown by internal stream threads. The handler receives the
exception and must return a `StreamThreadExceptionResponse` indicating how to
proceed:
+
+ * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+ * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+ * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.
Review Comment:
Should we call out, that this option is best-effort only, and that there is
not guarantee that the other instances will receive the shutdown signal?
##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka
Streams instance of yo
When the application instance starts running, the defined processor topology
will be initialized as one or more stream tasks. If the processor topology
defines any state stores, these are also constructed during the initialization
period. For more information, see the State restoration during workload
rebalance section).
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe
the internal state and behavior of your application. All listeners must be set
**before** calling `start()`. Attempting to set a listener after `start()` has
been called will throw an `IllegalStateException`. A setter may be called
multiple times before `start()`. Only the most recent listener takes effect.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the
`KafkaStreams` instance transitions between states. The possible states are:
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`,
`PENDING_ERROR`, and `ERROR`. See the
[`KafkaStreams.State`](/{version}/javadoc/org/apache/kafka/streams/KafkaStreams.State.html)
javadocs for the meaning of each state and the allowed transitions.
+
+ KafkaStreams streams = new KafkaStreams(topology, props);
+
+ streams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING) {
+ // application is now ready to process records
+ } else if (newState == KafkaStreams.State.ERROR) {
+ // application has encountered a fatal error
+ }
+ });
+
+ streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected
exceptions thrown by internal stream threads. The handler receives the
exception and must return a `StreamThreadExceptionResponse` indicating how to
proceed:
+
+ * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+ * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+ * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.
+
+Example:
+
+ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+ streams.setUncaughtExceptionHandler(exception -> {
+ if (exception instanceof RetriableException) {
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
+ }
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+ });
+
+The handler executes on the thread that produced the exception. Because the
handler is shared across all stream threads, the implementation **must be
thread-safe**. To get the thread that threw the exception, call
`Thread.currentThread()` from within the handler.
+
+## State restore listener
+
+You can set a `StateRestoreListener` to be notified about the progress of
state store restoration. This is useful for monitoring and logging the
restoration process. The listener provides callbacks for the following
lifecycle events:
+
+ * `onRestoreStart` -- Called when restoration begins for a state store
partition, providing the starting and ending offsets.
+ * `onBatchRestored` -- Called after each batch of records is restored,
providing the batch end offset and the number of records restored in the batch.
+ * `onRestoreEnd` -- Called when restoration completes for a state store
partition, providing the total number of records restored.
+ * `onRestoreSuspended` -- Called when restoration is suspended because the
task was migrated to another instance. This is a default method with an empty
implementation.
Review Comment:
> This is a default method with an empty implementation.
Is this important? Wondering if we should omit it? When implementing the
handler, users should find out naturally.
##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka
Streams instance of yo
When the application instance starts running, the defined processor topology
will be initialized as one or more stream tasks. If the processor topology
defines any state stores, these are also constructed during the initialization
period. For more information, see the State restoration during workload
rebalance section).
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe
the internal state and behavior of your application. All listeners must be set
**before** calling `start()`. Attempting to set a listener after `start()` has
been called will throw an `IllegalStateException`. A setter may be called
multiple times before `start()`. Only the most recent listener takes effect.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the
`KafkaStreams` instance transitions between states. The possible states are:
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`,
`PENDING_ERROR`, and `ERROR`. See the
[`KafkaStreams.State`](/{version}/javadoc/org/apache/kafka/streams/KafkaStreams.State.html)
javadocs for the meaning of each state and the allowed transitions.
+
+ KafkaStreams streams = new KafkaStreams(topology, props);
+
+ streams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING) {
+ // application is now ready to process records
+ } else if (newState == KafkaStreams.State.ERROR) {
+ // application has encountered a fatal error
+ }
+ });
+
+ streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected
exceptions thrown by internal stream threads. The handler receives the
exception and must return a `StreamThreadExceptionResponse` indicating how to
proceed:
+
+ * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+ * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+ * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.
+
+Example:
+
+ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+ streams.setUncaughtExceptionHandler(exception -> {
+ if (exception instanceof RetriableException) {
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
+ }
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+ });
+
+The handler executes on the thread that produced the exception. Because the
handler is shared across all stream threads, the implementation **must be
thread-safe**. To get the thread that threw the exception, call
`Thread.currentThread()` from within the handler.
+
+## State restore listener
+
+You can set a `StateRestoreListener` to be notified about the progress of
state store restoration. This is useful for monitoring and logging the
restoration process. The listener provides callbacks for the following
lifecycle events:
+
+ * `onRestoreStart` -- Called when restoration begins for a state store
partition, providing the starting and ending offsets.
+ * `onBatchRestored` -- Called after each batch of records is restored,
providing the batch end offset and the number of records restored in the batch.
+ * `onRestoreEnd` -- Called when restoration completes for a state store
partition, providing the total number of records restored.
+ * `onRestoreSuspended` -- Called when restoration is suspended because the
task was migrated to another instance. This is a default method with an empty
implementation.
+
+Example:
+
+ import org.apache.kafka.streams.processor.StateRestoreListener;
+
+ streams.setGlobalStateRestoreListener(new StateRestoreListener() {
+ @Override
+ public void onRestoreStart(TopicPartition topicPartition, String
storeName,
+ long startingOffset, long endingOffset) {
+ // log that restoration has started
+ }
+
+ @Override
+ public void onBatchRestored(TopicPartition topicPartition, String
storeName,
+ long batchEndOffset, long numRestored) {
+ // track progress
+ }
+
+ @Override
+ public void onRestoreEnd(TopicPartition topicPartition, String
storeName,
+ long totalRestored) {
+ // log that restoration is complete
+ }
+ });
+
+The listener is shared across all `StreamThread` instances, so it should be
**stateless** or properly synchronized. Note that this listener does **not**
monitor standby task updates. To monitor standby tasks, use the standby update
listener described below.
Review Comment:
Above, the for the uncaught exception handler, we say `Because the handler
is shared across all stream threads, the implementation **must be
thread-safe**.`
Why do we phrase it differently here? -- In general, all callbacks should be
stateless; why the explicit call-out on this one? -- Also, "should" might not
be strong enough?
##########
docs/streams/developer-guide/running-app.md:
##########
@@ -44,6 +44,118 @@ When you start your application you are launching a Kafka
Streams instance of yo
When the application instance starts running, the defined processor topology
will be initialized as one or more stream tasks. If the processor topology
defines any state stores, these are also constructed during the initialization
period. For more information, see the State restoration during workload
rebalance section).
+# Listeners and callbacks
+
+`KafkaStreams` provides several listeners and callbacks that let you observe
the internal state and behavior of your application. All listeners must be set
**before** calling `start()`. Attempting to set a listener after `start()` has
been called will throw an `IllegalStateException`. A setter may be called
multiple times before `start()`. Only the most recent listener takes effect.
+
+## State listener
+
+You can set a `KafkaStreams.StateListener` to be notified whenever the
`KafkaStreams` instance transitions between states. The possible states are:
`CREATED`, `REBALANCING`, `RUNNING`, `PENDING_SHUTDOWN`, `NOT_RUNNING`,
`PENDING_ERROR`, and `ERROR`. See the
[`KafkaStreams.State`](/{version}/javadoc/org/apache/kafka/streams/KafkaStreams.State.html)
javadocs for the meaning of each state and the allowed transitions.
+
+ KafkaStreams streams = new KafkaStreams(topology, props);
+
+ streams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING) {
+ // application is now ready to process records
+ } else if (newState == KafkaStreams.State.ERROR) {
+ // application has encountered a fatal error
+ }
+ });
+
+ streams.start();
+
+## Uncaught exception handler
+
+You can set a `StreamsUncaughtExceptionHandler` to handle unexpected
exceptions thrown by internal stream threads. The handler receives the
exception and must return a `StreamThreadExceptionResponse` indicating how to
proceed:
+
+ * `REPLACE_THREAD` -- Replace the failed thread with a new one.
+ * `SHUTDOWN_CLIENT` -- Shut down this `KafkaStreams` client.
+ * `SHUTDOWN_APPLICATION` -- Shut down all instances of the application.
+
+Example:
+
+ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+ streams.setUncaughtExceptionHandler(exception -> {
+ if (exception instanceof RetriableException) {
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
+ }
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+ });
+
+The handler executes on the thread that produced the exception. Because the
handler is shared across all stream threads, the implementation **must be
thread-safe**. To get the thread that threw the exception, call
`Thread.currentThread()` from within the handler.
+
+## State restore listener
+
+You can set a `StateRestoreListener` to be notified about the progress of
state store restoration. This is useful for monitoring and logging the
restoration process. The listener provides callbacks for the following
lifecycle events:
+
+ * `onRestoreStart` -- Called when restoration begins for a state store
partition, providing the starting and ending offsets.
+ * `onBatchRestored` -- Called after each batch of records is restored,
providing the batch end offset and the number of records restored in the batch.
+ * `onRestoreEnd` -- Called when restoration completes for a state store
partition, providing the total number of records restored.
+ * `onRestoreSuspended` -- Called when restoration is suspended because the
task was migrated to another instance. This is a default method with an empty
implementation.
+
+Example:
+
+ import org.apache.kafka.streams.processor.StateRestoreListener;
+
+ streams.setGlobalStateRestoreListener(new StateRestoreListener() {
+ @Override
+ public void onRestoreStart(TopicPartition topicPartition, String
storeName,
+ long startingOffset, long endingOffset) {
+ // log that restoration has started
+ }
+
+ @Override
+ public void onBatchRestored(TopicPartition topicPartition, String
storeName,
+ long batchEndOffset, long numRestored) {
+ // track progress
+ }
+
+ @Override
+ public void onRestoreEnd(TopicPartition topicPartition, String
storeName,
+ long totalRestored) {
+ // log that restoration is complete
+ }
+ });
+
+The listener is shared across all `StreamThread` instances, so it should be
**stateless** or properly synchronized. Note that this listener does **not**
monitor standby task updates. To monitor standby tasks, use the standby update
listener described below.
+
+## Standby update listener
+
+You can set a `StandbyUpdateListener` to be notified about updates to standby
state store replicas. Standby replicas keep a copy of the state store on a
different instance for faster failover. The listener provides callbacks for the
following lifecycle events:
+
+ * `onUpdateStart` -- Called when a standby task begins consuming from the
changelog, providing the starting offset.
+ * `onBatchLoaded` -- Called after each batch of records is loaded into the
standby store, providing the batch end offset, batch size, and the current end
offset of the changelog partition.
+ * `onUpdateSuspended` -- Called when the standby task stops updating. The
`SuspendReason` parameter indicates why: `MIGRATED` means the task was moved to
another instance, while `PROMOTED` means the standby was promoted to an active
task (in which case the corresponding `StateRestoreListener.onRestoreStart`
will be called next).
Review Comment:
> in which case the corresponding `StateRestoreListener.onRestoreStart` will
be called next)
Is this correct (I didn't check the code) -- wondering, if this might not
happen for the case that a standby is fully caught up, and the active task can
start processing right away, skipping the restore phase entirely?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]