[kafka] branch 3.2 updated: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349)

2022-07-19 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
 new d8541b20a1 KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare 
in Cooperative rebalance (#12349)
d8541b20a1 is described below

commit d8541b20a106f22736947e0c2f293833f3c3873b
Author: Shawn 
AuthorDate: Wed Jul 20 10:03:43 2022 +0800

KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative 
rebalance (#12349)

In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will 
be returned after the provided duration. It's because if rebalance needed, 
we'll try to commit current offset first before rebalance synchronously. And if 
the offset committing takes too long, the consumer#poll will spend more time 
than provided duration. To fix that, we change commit sync with commit async 
before rebalance (i.e. onPrepareJoin).

However, in this ticket, we found the async commit will keep sending a new 
commit request during each Consumer#poll, because the offset commit never 
completes in time. The impact is that the existing consumer will be kicked out 
of the group after rebalance timeout without joining the group. That is, 
suppose we have consumer A in group G, and now consumer B joined the group, 
after the rebalance, only consumer B in the group.

Besides, there's also another bug found during fixing this bug. Before 
KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when 
retriable error until timeout. After KAFKA-13310, we thought we have retry, but 
we'll retry after partitions revoking. That is, even though the retried offset 
commit successfully, it still causes some partitions offsets un-committed, and 
after rebalance, other consumers will consume overlapping records.

Reviewers: RivenSun , Luke Chen 
---
 .../consumer/internals/AbstractCoordinator.java|   5 +-
 .../consumer/internals/ConsumerCoordinator.java|  73 ++---
 .../internals/AbstractCoordinatorTest.java |   2 +-
 .../internals/ConsumerCoordinatorTest.java | 120 -
 .../runtime/distributed/WorkerCoordinator.java |   2 +-
 .../kafka/api/AbstractConsumerTest.scala   |  11 +-
 .../kafka/api/PlaintextConsumerTest.scala  |  87 +++
 7 files changed, 251 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5fe8a6a0e1..4d71482562 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -187,11 +187,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
 /**
  * Invoked prior to each group join or rejoin. This is typically used to 
perform any
  * cleanup from the previous generation (such as committing offsets for 
the consumer)
+ * @param timer Timer bounding how long this method can block
  * @param generation The previous generation or -1 if there was none
  * @param memberId The identifier of this member in the previous group or 
"" if there was none
  * @return true If onJoinPrepare async commit succeeded, false otherwise
  */
-protected abstract boolean onJoinPrepare(int generation, String memberId);
+protected abstract boolean onJoinPrepare(Timer timer, int generation, 
String memberId);
 
 /**
  * Invoked when the leader is elected. This is used by the leader to 
perform the assignment
@@ -426,7 +427,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
 needsJoinPrepare = false;
 // return false when onJoinPrepare is waiting for committing 
offset
-if (!onJoinPrepare(generation.generationId, 
generation.memberId)) {
+if (!onJoinPrepare(timer, generation.generationId, 
generation.memberId)) {
 needsJoinPrepare = true;
 //should not initiateJoinGroup if needsJoinPrepare still 
is true
 return false;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b853ff99e8..9838e7dc8f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -141,6 +141,12 @@ public final class ConsumerCoordinator extends 

[kafka] branch trunk updated: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance (#12349)

2022-07-19 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new eee40200df KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare 
in Cooperative rebalance (#12349)
eee40200df is described below

commit eee40200df5c68963c0b534f95b5154bfd60e290
Author: Shawn 
AuthorDate: Wed Jul 20 10:03:43 2022 +0800

KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative 
rebalance (#12349)

In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will 
be returned after the provided duration. It's because if rebalance needed, 
we'll try to commit current offset first before rebalance synchronously. And if 
the offset committing takes too long, the consumer#poll will spend more time 
than provided duration. To fix that, we change commit sync with commit async 
before rebalance (i.e. onPrepareJoin).

However, in this ticket, we found the async commit will keep sending a new 
commit request during each Consumer#poll, because the offset commit never 
completes in time. The impact is that the existing consumer will be kicked out 
of the group after rebalance timeout without joining the group. That is, 
suppose we have consumer A in group G, and now consumer B joined the group, 
after the rebalance, only consumer B in the group.

Besides, there's also another bug found during fixing this bug. Before 
KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when 
retriable error until timeout. After KAFKA-13310, we thought we have retry, but 
we'll retry after partitions revoking. That is, even though the retried offset 
commit successfully, it still causes some partitions offsets un-committed, and 
after rebalance, other consumers will consume overlapping records.

Reviewers: RivenSun , Luke Chen 
---
 .../consumer/internals/AbstractCoordinator.java|   5 +-
 .../consumer/internals/ConsumerCoordinator.java|  73 ++---
 .../internals/AbstractCoordinatorTest.java |   2 +-
 .../internals/ConsumerCoordinatorTest.java | 120 -
 .../runtime/distributed/WorkerCoordinator.java |   2 +-
 .../kafka/api/AbstractConsumerTest.scala   |  11 +-
 .../kafka/api/PlaintextConsumerTest.scala  |  87 +++
 7 files changed, 251 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c9ad797ebe..d2ece9efc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -187,11 +187,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
 /**
  * Invoked prior to each group join or rejoin. This is typically used to 
perform any
  * cleanup from the previous generation (such as committing offsets for 
the consumer)
+ * @param timer Timer bounding how long this method can block
  * @param generation The previous generation or -1 if there was none
  * @param memberId The identifier of this member in the previous group or 
"" if there was none
  * @return true If onJoinPrepare async commit succeeded, false otherwise
  */
-protected abstract boolean onJoinPrepare(int generation, String memberId);
+protected abstract boolean onJoinPrepare(Timer timer, int generation, 
String memberId);
 
 /**
  * Invoked when the leader is elected. This is used by the leader to 
perform the assignment
@@ -426,7 +427,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
 needsJoinPrepare = false;
 // return false when onJoinPrepare is waiting for committing 
offset
-if (!onJoinPrepare(generation.generationId, 
generation.memberId)) {
+if (!onJoinPrepare(timer, generation.generationId, 
generation.memberId)) {
 needsJoinPrepare = true;
 //should not initiateJoinGroup if needsJoinPrepare still 
is true
 return false;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b853ff99e8..9838e7dc8f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -141,6 +141,12 @@ public final class ConsumerCoordinator extends 

[kafka] branch trunk updated: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)

2022-07-19 Thread ableegoldman
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b62d8b975c KAFKA-12699: Override the default handler for stream 
threads if the stream's handler is used (#12324)
b62d8b975c is described below

commit b62d8b975cf97b5c1328b9b03a05fa09b07cf13a
Author: Walker Carlson <18128741+wcarls...@users.noreply.github.com>
AuthorDate: Tue Jul 19 15:35:26 2022 -0500

KAFKA-12699: Override the default handler for stream threads if the 
stream's handler is used (#12324)

Override the default handler for stream threads if the stream's handler is 
used. We do no want the java default handler triggering when a thread is 
replaced.

Reviewers: Anna Sophie Blee-Goldman 
---
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java   | 7 +++
 .../StreamsUncaughtExceptionHandlerIntegrationTest.java| 6 ++
 2 files changed, 13 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3a61f05de1..05d99dd172 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -471,6 +471,13 @@ public class KafkaStreams implements AutoCloseable {
 exception -> handleStreamsUncaughtException(exception, 
userStreamsUncaughtExceptionHandler, false)
 );
 }
+processStreamThread(thread -> 
thread.setUncaughtExceptionHandler((t, e) -> { }
+));
+
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler((t, e) -> { 
}
+);
+}
 } else {
 throw new IllegalStateException("Can only set 
UncaughtExceptionHandler before calling start(). " +
 "Current state is: " + state);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index be98e8d9fc..4af333a65a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -176,6 +176,12 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
 testReplaceThreads(2);
 }
 
+@Test
+public void shouldReplaceThreadsWithoutJavaHandler() throws 
InterruptedException {
+Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception 
thrown"));
+testReplaceThreads(2);
+}
+
 @Test
 public void shouldReplaceSingleThread() throws InterruptedException {
 testReplaceThreads(1);



[kafka-site] branch asf-site updated: Add AllegroGraph to powerdy-by (#409)

2022-07-19 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 329c1137 Add AllegroGraph to powerdy-by (#409)
329c1137 is described below

commit 329c113741ad4a10a4f82fac5371316ae07036c9
Author: gty <3747082+gt...@users.noreply.github.com>
AuthorDate: Wed Jul 20 03:16:52 2022 +1000

Add AllegroGraph to powerdy-by (#409)

Co-authored-by: Bill Bejeck 
---
 images/powered-by/allegrograph-franz-logo.png | Bin 0 -> 1420406 bytes
 powered-by.html   |   7 ++-
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/images/powered-by/allegrograph-franz-logo.png 
b/images/powered-by/allegrograph-franz-logo.png
new file mode 100644
index ..5693894e
Binary files /dev/null and b/images/powered-by/allegrograph-franz-logo.png 
differ
diff --git a/powered-by.html b/powered-by.html
index e266a8c5..cddae01e 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -710,6 +710,11 @@
 "logo": "atruvia_logo_online_rgb.png",
 "logoBgColor": "#d4f2f5",
 "description": "At Atruvia we use Apache Kafka to share events within 
the modern banking platform."
+}, {
+ "link": "https://allegrograph.com/;,
+ "logo": "allegrograph-franz-logo.png",
+ "logoBgColor": "#ff",
+ "description": "AllegroGraph and Kafka are used together as an 
Entity Event Knowledge Graph platform in diverse settings such as call centers, 
hospitals, insurance companies, aviation organizations and financial firms. By 
coupling AllegroGraph with Kafka, users can create a real-time decision engine 
that produces real-time event streams based on computations that trigger 
specific actions. AllegroGraph accepts incoming events, executes instant 
queries and analytics on the new data  [...]
 }, {
 "link": "http://www.atguigu.com/;,
 "logo": "atguigu.png",
@@ -720,7 +725,7 @@
 "logo": "covage.png",
 "logoBgColor": "#ff",
 "description": "Covage is an infrastructure operator designing, 
deploying and operating high speed open access networks. At the very heart of 
our IT platform, Kafka is ensuring propagating our business workflows' events 
among all applications."
-   }, {
+ }, {
 "link": "https://www.qudosoft.de/;,
 "logo": "qudosoft_wortbildmarke.png",
 "logoBgColor": "#ff",



[kafka] branch 3.2 updated: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)

2022-07-19 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
 new 56a136d820 Revert "KAFKA-12887 Skip some RuntimeExceptions from 
exception handler (#11228)" (#12421)
56a136d820 is described below

commit 56a136d8203f2d2cf90752ebd37b59850ea60b2a
Author: Walker Carlson <18128741+wcarls...@users.noreply.github.com>
AuthorDate: Tue Jul 19 11:17:46 2022 -0500

Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler 
(#11228)" (#12421)

This reverts commit 4835c64f

Reviewers: Matthias J. Sax 
---
 .../org/apache/kafka/streams/KafkaStreams.java | 22 +---
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++---
 .../processor/internals/StreamThreadTest.java  |  4 +-
 3 files changed, 10 insertions(+), 77 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4333bbb3cd..82e3a5bc78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -154,9 +154,6 @@ public class KafkaStreams implements AutoCloseable {
 
 private static final String JMX_PREFIX = "kafka.streams";
 
-private static final Set> 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
-new HashSet<>(Arrays.asList(IllegalStateException.class, 
IllegalArgumentException.class));
-
 // processId is expected to be unique across JVMs and to be used
 // in userData of the subscription request to allow assignor be aware
 // of the co-location of stream thread's consumers. It is for internal
@@ -514,25 +511,10 @@ public class KafkaStreams implements AutoCloseable {
 }
 }
 
-private boolean wrappedExceptionIsIn(final Throwable throwable, final 
Set> exceptionsOfInterest) {
-return throwable.getCause() != null && 
exceptionsOfInterest.contains(throwable.getCause().getClass());
-}
-
-private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,
-   
 final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
-final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action;
-if (wrappedExceptionIsIn(throwable, 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
-action = SHUTDOWN_CLIENT;
-} else {
-action = streamsUncaughtExceptionHandler.handle(throwable);
-}
-return action;
-}
-
 private void handleStreamsUncaughtException(final Throwable throwable,
 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
 final boolean 
skipThreadReplacement) {
-final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
 if (oldHandler) {
 log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
 "The old handler will be ignored as long as a new handler 
is set.");
@@ -548,7 +530,7 @@ public class KafkaStreams implements AutoCloseable {
 break;
 case SHUTDOWN_CLIENT:
 log.error("Encountered the following exception during 
processing " +
-"and Kafka Streams opted to " + action + "." +
+"and the registered exception handler opted to " + 
action + "." +
 " The streams client is going to shut down now. ", 
throwable);
 closeToError();
 break;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 0f42d3546f..761d6fbb87 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -90,9 +90,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
 }
 
 public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
-private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
-private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new 
AtomicBoolean(false);
-

[kafka-site] branch asf-site updated: Adding La Redoute as powered by Kafka (#315)

2022-07-19 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new f20d9e3b Adding La Redoute as powered by Kafka (#315)
f20d9e3b is described below

commit f20d9e3bb490b91eeac9cbf7be2eb389e5eed4cd
Author: Antoine Craske 
AuthorDate: Tue Jul 19 17:53:07 2022 +0100

Adding La Redoute as powered by Kafka (#315)

* Add files via upload

* Update powered-by.html

Co-authored-by: Bill Bejeck 
---
 images/powered-by/laredoute-logo.svg | 52 
 1 file changed, 52 insertions(+)

diff --git a/images/powered-by/laredoute-logo.svg 
b/images/powered-by/laredoute-logo.svg
new file mode 100644
index ..a208e093
--- /dev/null
+++ b/images/powered-by/laredoute-logo.svg
@@ -0,0 +1,52 @@
+
+
+http://www.w3.org/TR/2001/REC-SVG-20010904/DTD/svg10.dtd;>
+http://www.w3.org/2000/svg; 
xmlns:xlink="http://www.w3.org/1999/xlink; x="0px" y="0px" width="116px" 
height="19px" viewBox="0 0 116 19" enable-background="new 0 0 116 19" 
xml:space="preserve">
+
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+
+
\ No newline at end of file



[kafka] branch trunk updated: KAFKA-10199: Add RESUME in state updater (#12387)

2022-07-19 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387)
693e283802 is described below

commit 693e283802590b724ef441d5bf7acb6eeced91c5
Author: Guozhang Wang 
AuthorDate: Tue Jul 19 09:44:10 2022 -0700

KAFKA-10199: Add RESUME in state updater (#12387)

* Need to check enforceRestoreActive / transitToUpdateStandby when resuming 
a paused task.
* Do not expose another getResumedTasks since I think its caller only need 
the getPausedTasks.

Reviewers: Bruno Cadonna 
---
 .../processor/internals/DefaultStateUpdater.java   |  36 -
 .../streams/processor/internals/StateUpdater.java  |  13 ++
 .../streams/processor/internals/TaskAndAction.java |  10 +-
 .../internals/DefaultStateUpdaterTest.java | 158 -
 .../processor/internals/TaskAndActionTest.java |  20 +++
 5 files changed, 229 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 08959bee00..7e7ec2a6f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater {
 case PAUSE:
 pauseTask(taskAndAction.getTaskId());
 break;
+case RESUME:
+resumeTask(taskAndAction.getTaskId());
+break;
 }
 }
 } finally {
@@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater {
 final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
 if (existingTask != null) {
 throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
-"should not try to add another " + (task.isActive() ? 
"Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+"should not try to add another " + (task.isActive() ? 
"active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
 }
 
 if (task.isActive()) {
@@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 }
 
+private void resumeTask(final TaskId taskId) {
+final Task task = pausedTasks.get(taskId);
+if (task != null) {
+updatingTasks.put(taskId, task);
+pausedTasks.remove(taskId);
+
+if (task.isActive()) {
+log.debug("Stateful active task " + task.id() + " was 
resumed to the updating tasks of the state updater");
+changelogReader.enforceRestoreActive();
+} else {
+log.debug("Standby task " + task.id() + " was resumed to 
the updating tasks of the state updater");
+if (updatingTasks.size() == 1) {
+changelogReader.transitToUpdateStandby();
+}
+}
+} else {
+log.debug("Task " + taskId + " was not resumed since it is not 
paused.");
+}
+}
+
 private boolean isStateless(final Task task) {
 return task.changelogPartitions().isEmpty() && task.isActive();
 }
@@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 }
 
+@Override
+public void resume(final TaskId taskId) {
+tasksAndActionsLock.lock();
+try {
+tasksAndActions.add(TaskAndAction.createResumeTask(taskId));
+tasksAndActionsCondition.signalAll();
+} finally {
+tasksAndActionsLock.unlock();
+}
+}
+
 @Override
 public Set drainRestoredActiveTasks(final Duration timeout) {
 final long timeoutMs = timeout.toMillis();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 516e47436b..69d521b600 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -107,6 +107,19 @@ public interface StateUpdater {
  */
 void pause(final TaskId taskId);
 
+/**
+ * Resume 

[kafka] branch 3.3 updated: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)

2022-07-19 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 45009ef382 Revert "KAFKA-12887 Skip some RuntimeExceptions from 
exception handler (#11228)" (#12421)
45009ef382 is described below

commit 45009ef382145fb6e33c5ebec03600b37a1474c0
Author: Walker Carlson <18128741+wcarls...@users.noreply.github.com>
AuthorDate: Tue Jul 19 11:17:46 2022 -0500

Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler 
(#11228)" (#12421)

This reverts commit 4835c64f

Reviewers: Matthias J. Sax 
---
 .../org/apache/kafka/streams/KafkaStreams.java | 22 +---
 .../integration/EmitOnChangeIntegrationTest.java   |  2 +-
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++---
 .../processor/internals/StreamThreadTest.java  |  4 +-
 4 files changed, 11 insertions(+), 78 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a923c8e983..3a61f05de1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -155,9 +155,6 @@ public class KafkaStreams implements AutoCloseable {
 
 private static final String JMX_PREFIX = "kafka.streams";
 
-private static final Set> 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
-new HashSet<>(Arrays.asList(IllegalStateException.class, 
IllegalArgumentException.class));
-
 // processId is expected to be unique across JVMs and to be used
 // in userData of the subscription request to allow assignor be aware
 // of the co-location of stream thread's consumers. It is for internal
@@ -515,25 +512,10 @@ public class KafkaStreams implements AutoCloseable {
 }
 }
 
-private boolean wrappedExceptionIsIn(final Throwable throwable, final 
Set> exceptionsOfInterest) {
-return throwable.getCause() != null && 
exceptionsOfInterest.contains(throwable.getCause().getClass());
-}
-
-private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,
-   
 final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
-final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action;
-if (wrappedExceptionIsIn(throwable, 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
-action = SHUTDOWN_CLIENT;
-} else {
-action = streamsUncaughtExceptionHandler.handle(throwable);
-}
-return action;
-}
-
 private void handleStreamsUncaughtException(final Throwable throwable,
 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
 final boolean 
skipThreadReplacement) {
-final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
 if (oldHandler) {
 log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
 "The old handler will be ignored as long as a new handler 
is set.");
@@ -549,7 +531,7 @@ public class KafkaStreams implements AutoCloseable {
 break;
 case SHUTDOWN_CLIENT:
 log.error("Encountered the following exception during 
processing " +
-"and Kafka Streams opted to " + action + "." +
+"and the registered exception handler opted to " + 
action + "." +
 " The streams client is going to shut down now. ", 
throwable);
 closeToError();
 break;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 27958730cf..f41c95a6bb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -111,7 +111,7 @@ public class EmitOnChangeIntegrationTest {
 .toStream()
 .map((key, value) -> {
 if (shouldThrow.compareAndSet(true, false)) {
-throw new RuntimeException("Kaboom");
+throw new IllegalStateException("Kaboom");
 } else {
 return new 

[kafka] branch trunk updated (309e0f986e -> 188b2bf280)

2022-07-19 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386)
 add 188b2bf280 Revert "KAFKA-12887 Skip some RuntimeExceptions from 
exception handler (#11228)" (#12421)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/KafkaStreams.java | 22 +---
 .../integration/EmitOnChangeIntegrationTest.java   |  2 +-
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++---
 .../processor/internals/StreamThreadTest.java  |  4 +-
 4 files changed, 11 insertions(+), 78 deletions(-)



[kafka-site] branch asf-site updated: Add Qudosoft to powered-by (#406)

2022-07-19 Thread bbejeck
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 139b0afa Add Qudosoft to powered-by (#406)
139b0afa is described below

commit 139b0afa4290b730d488105349796952f5661025
Author: StephanZimmermann 
AuthorDate: Tue Jul 19 15:58:32 2022 +0200

Add Qudosoft to powered-by (#406)

rebased and merged conflicts
---
 images/powered-by/qudosoft_wortbildmarke.png | Bin 0 -> 17009 bytes
 powered-by.html  |   5 +
 2 files changed, 5 insertions(+)

diff --git a/images/powered-by/qudosoft_wortbildmarke.png 
b/images/powered-by/qudosoft_wortbildmarke.png
new file mode 100644
index ..e714959b
Binary files /dev/null and b/images/powered-by/qudosoft_wortbildmarke.png differ
diff --git a/powered-by.html b/powered-by.html
index 8e03f89b..e266a8c5 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -720,6 +720,11 @@
 "logo": "covage.png",
 "logoBgColor": "#ff",
 "description": "Covage is an infrastructure operator designing, 
deploying and operating high speed open access networks. At the very heart of 
our IT platform, Kafka is ensuring propagating our business workflows' events 
among all applications."
+   }, {
+"link": "https://www.qudosoft.de/;,
+"logo": "qudosoft_wortbildmarke.png",
+"logoBgColor": "#ff",
+"description": "At Qudosoft, as part of the bigger tech network 
organization behind Germany based KLiNGEL group, we build a big scale 
e-commerce plattform called Next Level Commerce (NLC). NLC is based on the 
principle of customer-oriented verticalization which allows us maximum autonomy 
for our teams. With our micro services architecture we strife for high 
flexibility and scalability. Using Kafka for processing event streams supports 
inter-connecting these services in exactly th [...]
 }];
 
 



[kafka-site] branch asf-site updated: KAFKA-13868: Add ASF links including privacy policy to the footer (#421)

2022-07-19 Thread mimaison
This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 3d409c1e KAFKA-13868: Add ASF links including privacy policy to the 
footer (#421)
3d409c1e is described below

commit 3d409c1eff0092b53de0a21298fe89e5e07dfd74
Author: Divij Vaidya 
AuthorDate: Tue Jul 19 14:23:18 2022 +0200

KAFKA-13868: Add ASF links including privacy policy to the footer (#421)


Reviewers: Mickael Maison , Luke Chen 

---
 css/styles.css   |  1 +
 includes/_footer.htm | 10 +-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/css/styles.css b/css/styles.css
index 1b0030c6..697ad892 100644
--- a/css/styles.css
+++ b/css/styles.css
@@ -901,6 +901,7 @@ tr:nth-child(odd) {
 color: #88;
 padding: 2rem 0;
 background: white;
+text-align: center;
 }
 .footer__legal {
 margin: 0 2rem;
diff --git a/includes/_footer.htm b/includes/_footer.htm
index fbb9f81b..7e80e7d9 100644
--- a/includes/_footer.htm
+++ b/includes/_footer.htm
@@ -4,9 +4,17 @@



-   The 
contents of this website are  2017 https://www.apache.org/; 
target="_blank">Apache Software Foundation under the terms of the https://www.apache.org/licenses/LICENSE-2.0.html; target="_blank">Apache 
License v2.
+   The 
contents of this website are  2022 https://www.apache.org/; 
target="_blank">Apache Software Foundation under the terms of the https://www.apache.org/licenses/LICENSE-2.0.html; target="_blank">Apache 
License v2.
Apache 
Kafka, Kafka, and the Kafka logo are either registered trademarks or trademarks 
of The Apache Software Foundation
in 
the United States and other countries.
+   
+   https://kafka.apache.org/project-security; target="_blank" 
rel="noreferrer">Security|
+   https://www.apache.org/foundation/sponsorship.html; target="_blank" 
rel="noreferrer">Donate|
+   https://www.apache.org/foundation/thanks.html; target="_blank" 
rel="noreferrer">Thanks|
+   https://apache.org/events/current-event; target="_blank" 
rel="noreferrer">Events|
+   https://apache.org/licenses/; target="_blank" 
rel="noreferrer">License|
+   https://privacy.apache.org/policies/privacy-policy-public.html; 
target="_blank" rel="noreferrer">Privacy
+   

http://www.apache.org;>