[kafka] branch 3.2 updated: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new d8541b20a1 KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349) d8541b20a1 is described below commit d8541b20a106f22736947e0c2f293833f3c3873b Author: Shawn AuthorDate: Wed Jul 20 10:03:43 2022 +0800 KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349) In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin). However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group. Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records. Reviewers: RivenSun , Luke Chen --- .../consumer/internals/AbstractCoordinator.java| 5 +- .../consumer/internals/ConsumerCoordinator.java| 73 ++--- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 120 - .../runtime/distributed/WorkerCoordinator.java | 2 +- .../kafka/api/AbstractConsumerTest.scala | 11 +- .../kafka/api/PlaintextConsumerTest.scala | 87 +++ 7 files changed, 251 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 5fe8a6a0e1..4d71482562 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -187,11 +187,12 @@ public abstract class AbstractCoordinator implements Closeable { /** * Invoked prior to each group join or rejoin. This is typically used to perform any * cleanup from the previous generation (such as committing offsets for the consumer) + * @param timer Timer bounding how long this method can block * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none * @return true If onJoinPrepare async commit succeeded, false otherwise */ -protected abstract boolean onJoinPrepare(int generation, String memberId); +protected abstract boolean onJoinPrepare(Timer timer, int generation, String memberId); /** * Invoked when the leader is elected. This is used by the leader to perform the assignment @@ -426,7 +427,7 @@ public abstract class AbstractCoordinator implements Closeable { // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; // return false when onJoinPrepare is waiting for committing offset -if (!onJoinPrepare(generation.generationId, generation.memberId)) { +if (!onJoinPrepare(timer, generation.generationId, generation.memberId)) { needsJoinPrepare = true; //should not initiateJoinGroup if needsJoinPrepare still is true return false; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b853ff99e8..9838e7dc8f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -141,6 +141,12 @@ public final class ConsumerCoordinator extends
[kafka] branch trunk updated: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new eee40200df KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349) eee40200df is described below commit eee40200df5c68963c0b534f95b5154bfd60e290 Author: Shawn AuthorDate: Wed Jul 20 10:03:43 2022 +0800 KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349) In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin). However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group. Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records. Reviewers: RivenSun , Luke Chen --- .../consumer/internals/AbstractCoordinator.java| 5 +- .../consumer/internals/ConsumerCoordinator.java| 73 ++--- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 120 - .../runtime/distributed/WorkerCoordinator.java | 2 +- .../kafka/api/AbstractConsumerTest.scala | 11 +- .../kafka/api/PlaintextConsumerTest.scala | 87 +++ 7 files changed, 251 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index c9ad797ebe..d2ece9efc5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -187,11 +187,12 @@ public abstract class AbstractCoordinator implements Closeable { /** * Invoked prior to each group join or rejoin. This is typically used to perform any * cleanup from the previous generation (such as committing offsets for the consumer) + * @param timer Timer bounding how long this method can block * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none * @return true If onJoinPrepare async commit succeeded, false otherwise */ -protected abstract boolean onJoinPrepare(int generation, String memberId); +protected abstract boolean onJoinPrepare(Timer timer, int generation, String memberId); /** * Invoked when the leader is elected. This is used by the leader to perform the assignment @@ -426,7 +427,7 @@ public abstract class AbstractCoordinator implements Closeable { // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; // return false when onJoinPrepare is waiting for committing offset -if (!onJoinPrepare(generation.generationId, generation.memberId)) { +if (!onJoinPrepare(timer, generation.generationId, generation.memberId)) { needsJoinPrepare = true; //should not initiateJoinGroup if needsJoinPrepare still is true return false; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b853ff99e8..9838e7dc8f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -141,6 +141,12 @@ public final class ConsumerCoordinator extends
[kafka] branch trunk updated: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)
This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b62d8b975c KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324) b62d8b975c is described below commit b62d8b975cf97b5c1328b9b03a05fa09b07cf13a Author: Walker Carlson <18128741+wcarls...@users.noreply.github.com> AuthorDate: Tue Jul 19 15:35:26 2022 -0500 KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324) Override the default handler for stream threads if the stream's handler is used. We do no want the java default handler triggering when a thread is replaced. Reviewers: Anna Sophie Blee-Goldman --- streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 7 +++ .../StreamsUncaughtExceptionHandlerIntegrationTest.java| 6 ++ 2 files changed, 13 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 3a61f05de1..05d99dd172 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -471,6 +471,13 @@ public class KafkaStreams implements AutoCloseable { exception -> handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, false) ); } +processStreamThread(thread -> thread.setUncaughtExceptionHandler((t, e) -> { } +)); + +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler((t, e) -> { } +); +} } else { throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " + "Current state is: " + state); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index be98e8d9fc..4af333a65a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -176,6 +176,12 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { testReplaceThreads(2); } +@Test +public void shouldReplaceThreadsWithoutJavaHandler() throws InterruptedException { +Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown")); +testReplaceThreads(2); +} + @Test public void shouldReplaceSingleThread() throws InterruptedException { testReplaceThreads(1);
[kafka-site] branch asf-site updated: Add AllegroGraph to powerdy-by (#409)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 329c1137 Add AllegroGraph to powerdy-by (#409) 329c1137 is described below commit 329c113741ad4a10a4f82fac5371316ae07036c9 Author: gty <3747082+gt...@users.noreply.github.com> AuthorDate: Wed Jul 20 03:16:52 2022 +1000 Add AllegroGraph to powerdy-by (#409) Co-authored-by: Bill Bejeck --- images/powered-by/allegrograph-franz-logo.png | Bin 0 -> 1420406 bytes powered-by.html | 7 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/images/powered-by/allegrograph-franz-logo.png b/images/powered-by/allegrograph-franz-logo.png new file mode 100644 index ..5693894e Binary files /dev/null and b/images/powered-by/allegrograph-franz-logo.png differ diff --git a/powered-by.html b/powered-by.html index e266a8c5..cddae01e 100644 --- a/powered-by.html +++ b/powered-by.html @@ -710,6 +710,11 @@ "logo": "atruvia_logo_online_rgb.png", "logoBgColor": "#d4f2f5", "description": "At Atruvia we use Apache Kafka to share events within the modern banking platform." +}, { + "link": "https://allegrograph.com/;, + "logo": "allegrograph-franz-logo.png", + "logoBgColor": "#ff", + "description": "AllegroGraph and Kafka are used together as an Entity Event Knowledge Graph platform in diverse settings such as call centers, hospitals, insurance companies, aviation organizations and financial firms. By coupling AllegroGraph with Kafka, users can create a real-time decision engine that produces real-time event streams based on computations that trigger specific actions. AllegroGraph accepts incoming events, executes instant queries and analytics on the new data [...] }, { "link": "http://www.atguigu.com/;, "logo": "atguigu.png", @@ -720,7 +725,7 @@ "logo": "covage.png", "logoBgColor": "#ff", "description": "Covage is an infrastructure operator designing, deploying and operating high speed open access networks. At the very heart of our IT platform, Kafka is ensuring propagating our business workflows' events among all applications." - }, { + }, { "link": "https://www.qudosoft.de/;, "logo": "qudosoft_wortbildmarke.png", "logoBgColor": "#ff",
[kafka] branch 3.2 updated: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new 56a136d820 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421) 56a136d820 is described below commit 56a136d8203f2d2cf90752ebd37b59850ea60b2a Author: Walker Carlson <18128741+wcarls...@users.noreply.github.com> AuthorDate: Tue Jul 19 11:17:46 2022 -0500 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421) This reverts commit 4835c64f Reviewers: Matthias J. Sax --- .../org/apache/kafka/streams/KafkaStreams.java | 22 +--- ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++--- .../processor/internals/StreamThreadTest.java | 4 +- 3 files changed, 10 insertions(+), 77 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4333bbb3cd..82e3a5bc78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -154,9 +154,6 @@ public class KafkaStreams implements AutoCloseable { private static final String JMX_PREFIX = "kafka.streams"; -private static final Set> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS = -new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class)); - // processId is expected to be unique across JVMs and to be used // in userData of the subscription request to allow assignor be aware // of the co-location of stream thread's consumers. It is for internal @@ -514,25 +511,10 @@ public class KafkaStreams implements AutoCloseable { } } -private boolean wrappedExceptionIsIn(final Throwable throwable, final Set> exceptionsOfInterest) { -return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass()); -} - -private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, - final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { -final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action; -if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) { -action = SHUTDOWN_CLIENT; -} else { -action = streamsUncaughtExceptionHandler.handle(throwable); -} -return action; -} - private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final boolean skipThreadReplacement) { -final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); if (oldHandler) { log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + "The old handler will be ignored as long as a new handler is set."); @@ -548,7 +530,7 @@ public class KafkaStreams implements AutoCloseable { break; case SHUTDOWN_CLIENT: log.error("Encountered the following exception during processing " + -"and Kafka Streams opted to " + action + "." + +"and the registered exception handler opted to " + action + "." + " The streams client is going to shut down now. ", throwable); closeToError(); break; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 0f42d3546f..761d6fbb87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -90,9 +90,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { } public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); -private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true); -private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false); -
[kafka-site] branch asf-site updated: Adding La Redoute as powered by Kafka (#315)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new f20d9e3b Adding La Redoute as powered by Kafka (#315) f20d9e3b is described below commit f20d9e3bb490b91eeac9cbf7be2eb389e5eed4cd Author: Antoine Craske AuthorDate: Tue Jul 19 17:53:07 2022 +0100 Adding La Redoute as powered by Kafka (#315) * Add files via upload * Update powered-by.html Co-authored-by: Bill Bejeck --- images/powered-by/laredoute-logo.svg | 52 1 file changed, 52 insertions(+) diff --git a/images/powered-by/laredoute-logo.svg b/images/powered-by/laredoute-logo.svg new file mode 100644 index ..a208e093 --- /dev/null +++ b/images/powered-by/laredoute-logo.svg @@ -0,0 +1,52 @@ + + +http://www.w3.org/TR/2001/REC-SVG-20010904/DTD/svg10.dtd;> +http://www.w3.org/2000/svg; xmlns:xlink="http://www.w3.org/1999/xlink; x="0px" y="0px" width="116px" height="19px" viewBox="0 0 116 19" enable-background="new 0 0 116 19" xml:space="preserve"> + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file
[kafka] branch trunk updated: KAFKA-10199: Add RESUME in state updater (#12387)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387) 693e283802 is described below commit 693e283802590b724ef441d5bf7acb6eeced91c5 Author: Guozhang Wang AuthorDate: Tue Jul 19 09:44:10 2022 -0700 KAFKA-10199: Add RESUME in state updater (#12387) * Need to check enforceRestoreActive / transitToUpdateStandby when resuming a paused task. * Do not expose another getResumedTasks since I think its caller only need the getPausedTasks. Reviewers: Bruno Cadonna --- .../processor/internals/DefaultStateUpdater.java | 36 - .../streams/processor/internals/StateUpdater.java | 13 ++ .../streams/processor/internals/TaskAndAction.java | 10 +- .../internals/DefaultStateUpdaterTest.java | 158 - .../processor/internals/TaskAndActionTest.java | 20 +++ 5 files changed, 229 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 08959bee00..7e7ec2a6f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater { case PAUSE: pauseTask(taskAndAction.getTaskId()); break; +case RESUME: +resumeTask(taskAndAction.getTaskId()); +break; } } } finally { @@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater { final Task existingTask = updatingTasks.putIfAbsent(task.id(), task); if (existingTask != null) { throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " + -"should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE); +"should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE); } if (task.isActive()) { @@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater { } } +private void resumeTask(final TaskId taskId) { +final Task task = pausedTasks.get(taskId); +if (task != null) { +updatingTasks.put(taskId, task); +pausedTasks.remove(taskId); + +if (task.isActive()) { +log.debug("Stateful active task " + task.id() + " was resumed to the updating tasks of the state updater"); +changelogReader.enforceRestoreActive(); +} else { +log.debug("Standby task " + task.id() + " was resumed to the updating tasks of the state updater"); +if (updatingTasks.size() == 1) { +changelogReader.transitToUpdateStandby(); +} +} +} else { +log.debug("Task " + taskId + " was not resumed since it is not paused."); +} +} + private boolean isStateless(final Task task) { return task.changelogPartitions().isEmpty() && task.isActive(); } @@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater { } } +@Override +public void resume(final TaskId taskId) { +tasksAndActionsLock.lock(); +try { +tasksAndActions.add(TaskAndAction.createResumeTask(taskId)); +tasksAndActionsCondition.signalAll(); +} finally { +tasksAndActionsLock.unlock(); +} +} + @Override public Set drainRestoredActiveTasks(final Duration timeout) { final long timeoutMs = timeout.toMillis(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index 516e47436b..69d521b600 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -107,6 +107,19 @@ public interface StateUpdater { */ void pause(final TaskId taskId); +/** + * Resume
[kafka] branch 3.3 updated: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 45009ef382 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421) 45009ef382 is described below commit 45009ef382145fb6e33c5ebec03600b37a1474c0 Author: Walker Carlson <18128741+wcarls...@users.noreply.github.com> AuthorDate: Tue Jul 19 11:17:46 2022 -0500 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421) This reverts commit 4835c64f Reviewers: Matthias J. Sax --- .../org/apache/kafka/streams/KafkaStreams.java | 22 +--- .../integration/EmitOnChangeIntegrationTest.java | 2 +- ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++--- .../processor/internals/StreamThreadTest.java | 4 +- 4 files changed, 11 insertions(+), 78 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index a923c8e983..3a61f05de1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -155,9 +155,6 @@ public class KafkaStreams implements AutoCloseable { private static final String JMX_PREFIX = "kafka.streams"; -private static final Set> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS = -new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class)); - // processId is expected to be unique across JVMs and to be used // in userData of the subscription request to allow assignor be aware // of the co-location of stream thread's consumers. It is for internal @@ -515,25 +512,10 @@ public class KafkaStreams implements AutoCloseable { } } -private boolean wrappedExceptionIsIn(final Throwable throwable, final Set> exceptionsOfInterest) { -return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass()); -} - -private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, - final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { -final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action; -if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) { -action = SHUTDOWN_CLIENT; -} else { -action = streamsUncaughtExceptionHandler.handle(throwable); -} -return action; -} - private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final boolean skipThreadReplacement) { -final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); if (oldHandler) { log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + "The old handler will be ignored as long as a new handler is set."); @@ -549,7 +531,7 @@ public class KafkaStreams implements AutoCloseable { break; case SHUTDOWN_CLIENT: log.error("Encountered the following exception during processing " + -"and Kafka Streams opted to " + action + "." + +"and the registered exception handler opted to " + action + "." + " The streams client is going to shut down now. ", throwable); closeToError(); break; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java index 27958730cf..f41c95a6bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java @@ -111,7 +111,7 @@ public class EmitOnChangeIntegrationTest { .toStream() .map((key, value) -> { if (shouldThrow.compareAndSet(true, false)) { -throw new RuntimeException("Kaboom"); +throw new IllegalStateException("Kaboom"); } else { return new
[kafka] branch trunk updated (309e0f986e -> 188b2bf280)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386) add 188b2bf280 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreams.java | 22 +--- .../integration/EmitOnChangeIntegrationTest.java | 2 +- ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++--- .../processor/internals/StreamThreadTest.java | 4 +- 4 files changed, 11 insertions(+), 78 deletions(-)
[kafka-site] branch asf-site updated: Add Qudosoft to powered-by (#406)
This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 139b0afa Add Qudosoft to powered-by (#406) 139b0afa is described below commit 139b0afa4290b730d488105349796952f5661025 Author: StephanZimmermann AuthorDate: Tue Jul 19 15:58:32 2022 +0200 Add Qudosoft to powered-by (#406) rebased and merged conflicts --- images/powered-by/qudosoft_wortbildmarke.png | Bin 0 -> 17009 bytes powered-by.html | 5 + 2 files changed, 5 insertions(+) diff --git a/images/powered-by/qudosoft_wortbildmarke.png b/images/powered-by/qudosoft_wortbildmarke.png new file mode 100644 index ..e714959b Binary files /dev/null and b/images/powered-by/qudosoft_wortbildmarke.png differ diff --git a/powered-by.html b/powered-by.html index 8e03f89b..e266a8c5 100644 --- a/powered-by.html +++ b/powered-by.html @@ -720,6 +720,11 @@ "logo": "covage.png", "logoBgColor": "#ff", "description": "Covage is an infrastructure operator designing, deploying and operating high speed open access networks. At the very heart of our IT platform, Kafka is ensuring propagating our business workflows' events among all applications." + }, { +"link": "https://www.qudosoft.de/;, +"logo": "qudosoft_wortbildmarke.png", +"logoBgColor": "#ff", +"description": "At Qudosoft, as part of the bigger tech network organization behind Germany based KLiNGEL group, we build a big scale e-commerce plattform called Next Level Commerce (NLC). NLC is based on the principle of customer-oriented verticalization which allows us maximum autonomy for our teams. With our micro services architecture we strife for high flexibility and scalability. Using Kafka for processing event streams supports inter-connecting these services in exactly th [...] }];
[kafka-site] branch asf-site updated: KAFKA-13868: Add ASF links including privacy policy to the footer (#421)
This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 3d409c1e KAFKA-13868: Add ASF links including privacy policy to the footer (#421) 3d409c1e is described below commit 3d409c1eff0092b53de0a21298fe89e5e07dfd74 Author: Divij Vaidya AuthorDate: Tue Jul 19 14:23:18 2022 +0200 KAFKA-13868: Add ASF links including privacy policy to the footer (#421) Reviewers: Mickael Maison , Luke Chen --- css/styles.css | 1 + includes/_footer.htm | 10 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/css/styles.css b/css/styles.css index 1b0030c6..697ad892 100644 --- a/css/styles.css +++ b/css/styles.css @@ -901,6 +901,7 @@ tr:nth-child(odd) { color: #88; padding: 2rem 0; background: white; +text-align: center; } .footer__legal { margin: 0 2rem; diff --git a/includes/_footer.htm b/includes/_footer.htm index fbb9f81b..7e80e7d9 100644 --- a/includes/_footer.htm +++ b/includes/_footer.htm @@ -4,9 +4,17 @@ - The contents of this website are 2017 https://www.apache.org/; target="_blank">Apache Software Foundation under the terms of the https://www.apache.org/licenses/LICENSE-2.0.html; target="_blank">Apache License v2. + The contents of this website are 2022 https://www.apache.org/; target="_blank">Apache Software Foundation under the terms of the https://www.apache.org/licenses/LICENSE-2.0.html; target="_blank">Apache License v2. Apache Kafka, Kafka, and the Kafka logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries. + + https://kafka.apache.org/project-security; target="_blank" rel="noreferrer">Security| + https://www.apache.org/foundation/sponsorship.html; target="_blank" rel="noreferrer">Donate| + https://www.apache.org/foundation/thanks.html; target="_blank" rel="noreferrer">Thanks| + https://apache.org/events/current-event; target="_blank" rel="noreferrer">Events| + https://apache.org/licenses/; target="_blank" rel="noreferrer">License| + https://privacy.apache.org/policies/privacy-policy-public.html; target="_blank" rel="noreferrer">Privacy + http://www.apache.org;>