[kafka-site] branch asf-site updated: MINOR: update Kafka Streams state.dir doc (#536)

2023-08-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 1ac4a1d1 MINOR: update Kafka Streams state.dir doc (#536)
1ac4a1d1 is described below

commit 1ac4a1d151491b339d0116dbd45a47bca4911103
Author: Matthias J. Sax 
AuthorDate: Sun Aug 6 10:20:32 2023 -0700

MINOR: update Kafka Streams state.dir doc (#536)

Default state directory was changes in 2.8.0 release (cf KAFKA-10604)

Reviewers: Guozhang Wang 
---
 28/streams/developer-guide/config-streams.html | 2 +-
 30/streams/developer-guide/config-streams.html | 2 +-
 31/streams/developer-guide/config-streams.html | 2 +-
 32/streams/developer-guide/config-streams.html | 2 +-
 33/streams/developer-guide/config-streams.html | 2 +-
 34/streams/developer-guide/config-streams.html | 2 +-
 35/streams/developer-guide/config-streams.html | 2 +-
 7 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/28/streams/developer-guide/config-streams.html 
b/28/streams/developer-guide/config-streams.html
index 5b89ef4d..706e456e 100644
--- a/28/streams/developer-guide/config-streams.html
+++ b/28/streams/developer-guide/config-streams.html
@@ -320,7 +320,7 @@
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium
diff --git a/30/streams/developer-guide/config-streams.html 
b/30/streams/developer-guide/config-streams.html
index 2fa3088f..51706f74 100644
--- a/30/streams/developer-guide/config-streams.html
+++ b/30/streams/developer-guide/config-streams.html
@@ -335,7 +335,7 @@ settings.put(... , ...);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium
diff --git a/31/streams/developer-guide/config-streams.html 
b/31/streams/developer-guide/config-streams.html
index 318f9cb0..1ce575fa 100644
--- a/31/streams/developer-guide/config-streams.html
+++ b/31/streams/developer-guide/config-streams.html
@@ -407,7 +407,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium
diff --git a/32/streams/developer-guide/config-streams.html 
b/32/streams/developer-guide/config-streams.html
index a99d90da..65279e61 100644
--- a/32/streams/developer-guide/config-streams.html
+++ b/32/streams/developer-guide/config-streams.html
@@ -415,7 +415,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium
diff --git a/33/streams/developer-guide/config-streams.html 
b/33/streams/developer-guide/config-streams.html
index a99d90da..65279e61 100644
--- a/33/streams/developer-guide/config-streams.html
+++ b/33/streams/developer-guide/config-streams.html
@@ -415,7 +415,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium
diff --git a/34/streams/developer-guide/config-streams.html 
b/34/streams/developer-guide/config-streams.html
index bccf97eb..5aaf52c8 100644
--- a/34/streams/developer-guide/config-streams.html
+++ b/34/streams/developer-guide/config-streams.html
@@ -415,7 +415,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium
diff --git a/35/streams/developer-guide/config-streams.html 
b/35/streams/developer-guide/config-streams.html
index b1512df2..a38e8b3d 100644
--- a/35/streams/developer-guide/config-streams.html
+++ b/35/streams/developer-guide/config-streams.html
@@ -415,7 +415,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium



[kafka] branch trunk updated: MINOR: update Kafka Streams state.dir doc (#14155)

2023-08-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 7a2e11cae73 MINOR: update Kafka Streams state.dir doc (#14155)
7a2e11cae73 is described below

commit 7a2e11cae739f3391f61e2e29b148d3a3ebea8b3
Author: Matthias J. Sax 
AuthorDate: Sun Aug 6 10:20:08 2023 -0700

MINOR: update Kafka Streams state.dir doc (#14155)

Default state directory was changes in 2.8.0 release (cf KAFKA-10604)

Reviewers: Guozhang Wang 
---
 docs/streams/developer-guide/config-streams.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index b1512df24b5..a38e8b3d3b7 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -415,7 +415,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
   state.dir
 High
 Directory location for state stores.
-/tmp/kafka-streams
+/${java.io.tmpdir}/kafka-streams
   
   task.timeout.ms
 Medium



[kafka] branch trunk updated: KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)

2023-08-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e0b7499103d KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict 
(#13920)
e0b7499103d is described below

commit e0b7499103df9222140cdbf7047494d92913987e
Author: flashmouse 
AuthorDate: Fri Aug 4 02:17:08 2023 +0800

KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)

in 3.5.0 AbstractStickyAssignor may run useless loop in 
performReassignments  because isBalanced have a trivial mistake, and result in 
rebalance timeout in some situation.

Co-authored-by: lixy 
Reviewers: Ritika Reddy , Philip Nee 
, Kirk True , Guozhang Wang 

---
 .../consumer/internals/AbstractStickyAssignor.java |  5 +-
 .../internals/AbstractStickyAssignorTest.java  | 86 ++
 2 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 1bde792d598..0823752d159 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -158,10 +158,11 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 // generation amongst
 for (final TopicPartition tp : memberData.partitions) {
 if (allTopics.contains(tp.topic())) {
-String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
 if (otherConsumer == null) {
 // this partition is not owned by other consumer in 
the same generation
 ownedPartitions.add(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
 } else {
 final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
@@ -1172,7 +1173,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 if 
(!currentAssignment.get(consumer).contains(topicPartition)) {
 String otherConsumer = 
allPartitions.get(topicPartition);
 int otherConsumerPartitionCount = 
currentAssignment.get(otherConsumer).size();
-if (consumerPartitionCount < 
otherConsumerPartitionCount) {
+if (consumerPartitionCount + 1 < 
otherConsumerPartitionCount) {
 log.debug("{} can be moved from consumer {} to 
consumer {} for a more balanced assignment.",
 topicPartition, otherConsumer, 
consumer);
 return false;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index cdb0142c49b..71d188f3cd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.clients.consumer.StickyAssignor;
@@ -724,6 +725,91 @@ public abstract class AbstractStickyAssignorTest {
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)
+@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+@ValueSource(booleans = {false, true})
+public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+int topicCount = hasConsumerRack ? 50 : 100;
+int partitionCount = 2_00;
+int consumerCount = 5_00;
+
+List topics = new ArrayList<>();
+Map> partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+}
+for (int i = 0; i < consumerCoun

[kafka] branch trunk updated (c4ad09e47d7 -> 750a3893081)

2023-04-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from c4ad09e47d7 MINOR: Add more KRaft reassignment tests (#13521)
 add 750a3893081 MINOR: Follow-up on failing streams test, and fix 
StoreChangelogReader (#13523)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/DefaultStateUpdater.java   |  2 +-
 .../streams/processor/internals/ReadOnlyTask.java  |  2 +-
 .../streams/processor/internals/StandbyTask.java   |  6 +++-
 .../streams/processor/internals/StateUpdater.java  |  6 +++-
 .../processor/internals/StoreChangelogReader.java  | 37 ++
 .../streams/processor/internals/StreamTask.java| 14 
 .../kafka/streams/processor/internals/Task.java|  2 +-
 .../integration/PauseResumeIntegrationTest.java| 32 ++-
 .../internals/DefaultStateUpdaterTest.java |  4 +--
 .../processor/internals/StandbyTaskTest.java   |  4 +--
 .../internals/StoreChangelogReaderTest.java| 28 +---
 .../processor/internals/StreamTaskTest.java|  6 ++--
 .../processor/internals/TaskManagerTest.java   |  2 +-
 .../internals/metrics/TaskMetricsTest.java | 15 ++---
 14 files changed, 103 insertions(+), 57 deletions(-)



[kafka] branch trunk updated (2117c4bce8f -> f02f5f8c8a8)

2023-04-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 2117c4bce8f Minor: fix ReadOnlyTaskTest (#13519)
 add f02f5f8c8a8 MINOR: fix stream failing tests (#13512)

No new revisions were added by this update.

Summary of changes:
 .../streams/processor/internals/StoreChangelogReader.java | 15 ---
 .../processor/internals/metrics/TaskMetricsTest.java  |  2 ++
 2 files changed, 6 insertions(+), 11 deletions(-)



[kafka] branch trunk updated (4f34ce1b491 -> 2117c4bce8f)

2023-04-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 4f34ce1b491 KAFKA-14376: Add ConfigProvider to make use of environment 
variables KIP-887 (#12992)
 add 2117c4bce8f Minor: fix ReadOnlyTaskTest (#13519)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java| 4 
 1 file changed, 4 insertions(+)



[kafka] branch 3.4 updated: KAFKA-14172: Should clear cache when active recycled from standby (#13369)

2023-04-05 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
 new 2dd3713b2a1 KAFKA-14172: Should clear cache when active recycled from 
standby (#13369)
2dd3713b2a1 is described below

commit 2dd3713b2a183bb904651c0022c5d953970b4ad3
Author: Guozhang Wang 
AuthorDate: Wed Apr 5 16:05:11 2023 -0700

KAFKA-14172: Should clear cache when active recycled from standby (#13369)

This fix is inspired by #12540.

1. Added a clearCache function for CachedStateStore, which would be 
triggered upon recycling a state manager.
2. Added the integration test inherited from #12540 .
3. Improved some log4j entries.
4. Found and fixed a minor issue with log4j prefix.

Reviewers: Lucas Brutschy , Matthias J. Sax 

---
 .../processor/internals/ActiveTaskCreator.java |   4 +-
 .../processor/internals/ProcessorStateManager.java |  20 +-
 .../processor/internals/StandbyTaskCreator.java|   2 +-
 .../processor/internals/StoreChangelogReader.java  |   2 +-
 .../streams/state/internals/CachedStateStore.java  |  10 +
 .../state/internals/CachingKeyValueStore.java  |  12 +
 .../state/internals/CachingSessionStore.java   |   5 +
 .../state/internals/CachingWindowStore.java|   5 +
 .../kafka/streams/state/internals/NamedCache.java  |   7 +
 .../kafka/streams/state/internals/ThreadCache.java |   8 +
 .../internals/TimeOrderedCachingWindowStore.java   |   5 +
 .../streams/state/internals/WrappedStateStore.java |   8 +
 ...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 +
 .../internals/ProcessorStateManagerTest.java   |  29 ++
 14 files changed, 411 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 581175e2c97..7f423c84069 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -226,7 +226,7 @@ class ActiveTaskCreator {
 }
 
 standbyTask.prepareRecycle();
-standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE);
+standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE, 
getLogContext(standbyTask.id));
 
 final RecordCollector recordCollector = 
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), 
standbyTask.topology);
 final StreamTask task = new StreamTask(
@@ -324,7 +324,7 @@ class ActiveTaskCreator {
 
 private LogContext getLogContext(final TaskId taskId) {
 final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
-final String logPrefix = threadIdPrefix + String.format("%s [%s] ", 
"task", taskId);
+final String logPrefix = threadIdPrefix + String.format("%s [%s] ", 
"stream-task", taskId);
 return new LogContext(logPrefix);
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 6a66e428ea5..746bb02db29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -588,14 +588,30 @@ public class ProcessorStateManager implements 
StateManager {
 final List allChangelogs = 
getAllChangelogTopicPartitions();
 changelogReader.unregister(allChangelogs);
 }
+
+// when the state manager is recycled to be used, future writes may 
bypass its store's caching
+// layer if they are from restoration, hence we need to clear the 
state store's caches just in case
+// See KAFKA-14172 for details
+if (!stores.isEmpty()) {
+log.debug("Clearing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+if (store instanceof CachedStateStore) {
+((CachedStateStore) store).clearCache();
+}
+log.trace("Cleared cache {}", store.name());
+}
+}
 }
 
-void transitionTaskType(final TaskType newType) {
+void transitionTaskType(final TaskType newType, final LogContext 
logContext) {
 if (taskType.equals(newType)) {
 throw new IllegalStateException("Tried to recycle state for task 
type conver

[kafka] branch trunk updated (653baa66948 -> b2ee6df1c4c)

2023-04-05 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 653baa66948 KAFKA-10199: Add task updater metrics, part 2 (#13300)
 add b2ee6df1c4c KAFKA-14172: Should clear cache when active recycled from 
standby (#13369)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/ActiveTaskCreator.java |   4 +-
 .../processor/internals/ProcessorStateManager.java |  20 +-
 .../processor/internals/StandbyTaskCreator.java|   2 +-
 .../processor/internals/StoreChangelogReader.java  |   2 +-
 .../streams/state/internals/CachedStateStore.java  |  10 +
 .../state/internals/CachingKeyValueStore.java  |  12 +
 .../state/internals/CachingSessionStore.java   |   5 +
 .../state/internals/CachingWindowStore.java|   5 +
 .../kafka/streams/state/internals/NamedCache.java  |   7 +
 .../kafka/streams/state/internals/ThreadCache.java |   8 +
 .../internals/TimeOrderedCachingWindowStore.java   |   5 +
 .../streams/state/internals/WrappedStateStore.java |   8 +
 ...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 +
 .../internals/ProcessorStateManagerTest.java   |  29 ++
 14 files changed, 411 insertions(+), 6 deletions(-)
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java



[kafka] branch trunk updated (6d36db1c78f -> beb0be5fe45)

2023-04-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 6d36db1c78f KAFKA-14765 and KAFKA-14776: Support for SCRAM at 
bootstrap with integration tests (#13374)
 add beb0be5fe45 KAFKA-14533: Do not interrupt state-updater thread during 
shutdown (#13318)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/KafkaStreams.java |  2 +-
 .../streams/processor/internals/ClientUtils.java   | 36 ++
 .../processor/internals/DefaultStateUpdater.java   | 57 +-
 .../internals/StreamsPartitionAssignor.java| 17 ---
 .../streams/processor/internals/TaskManager.java   |  2 +
 .../SmokeTestDriverIntegrationTest.java|  3 +-
 ...ghAvailabilityStreamsPartitionAssignorTest.java | 39 +++
 .../RackAwarenessStreamsPartitionAssignorTest.java | 35 +++--
 .../internals/StreamsPartitionAssignorTest.java| 19 +++-
 .../internals/assignment/AssignmentTestUtils.java  | 19 +++-
 10 files changed, 112 insertions(+), 117 deletions(-)



[kafka] branch trunk updated (f8d0fc835bf -> 5c0e4aa6764)

2023-03-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from f8d0fc835bf MINOR: Remove addOne to fix build (#13469)
 add 5c0e4aa6764 KAFKA-14468: Committed API (#13380)

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/CommitRequestManager.java   | 364 ++---
 .../internals/DefaultBackgroundThread.java |   3 +-
 .../consumer/internals/PrototypeAsyncConsumer.java |  79 +++--
 .../internals/events/ApplicationEvent.java |   2 +-
 .../events/ApplicationEventProcessor.java  |  22 +-
 .../events/OffsetFetchApplicationEvent.java|  43 +++
 .../internals/CommitRequestManagerTest.java| 304 +++--
 .../internals/PrototypeAsyncConsumerTest.java  |  86 -
 .../kafka/api/BaseAsyncConsumerTest.scala  |   2 +-
 9 files changed, 781 insertions(+), 124 deletions(-)
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java



[kafka] branch trunk updated (7438f100cf4 -> 31440b00f3e)

2023-03-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 7438f100cf4 KAFKA-14774 the removed listeners should not be 
reconfigurable (#13326)
 add 31440b00f3e KAFKA-14848: KafkaConsumer incorrectly passes 
locally-scoped serializers to FetchConfig (#13452)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/consumer/KafkaConsumer.java  |  5 +-
 .../clients/consumer/internals/FetchConfig.java| 10 ++-
 .../consumer/internals/FetchConfigTest.java| 92 ++
 3 files changed, 102 insertions(+), 5 deletions(-)
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchConfigTest.java



[kafka] branch trunk updated: KAFKA-14365: Extract common logic from Fetcher (#13425)

2023-03-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new a3252629a37 KAFKA-14365: Extract common logic from Fetcher (#13425)
a3252629a37 is described below

commit a3252629a37344be1f1f86a77cf685f9c147be4d
Author: Kirk True 
AuthorDate: Fri Mar 24 14:33:13 2023 -0700

KAFKA-14365: Extract common logic from Fetcher (#13425)

* KAFKA-14365: Extract common logic from Fetcher

Extract logic from Fetcher into AbstractFetcher.

Also introduce FetchConfig as a more concise way to delineate state from
incoming configuration.

Formalized the defaults in CommonClientConfigs and ConsumerConfig to be
accessible elsewhere.

* Removed overridden methods in favor of synchronizing where needed

Reviewers: Guozhang Wang 
---
 checkstyle/suppressions.xml|  14 +-
 .../apache/kafka/clients/CommonClientConfigs.java  |   1 +
 .../kafka/clients/consumer/ConsumerConfig.java |  14 +-
 .../kafka/clients/consumer/KafkaConsumer.java  |  25 +-
 .../clients/consumer/internals/AbstractFetch.java  | 791 +
 .../clients/consumer/internals/CompletedFetch.java |  25 +-
 .../clients/consumer/internals/FetchConfig.java| 124 
 .../consumer/internals/FetchMetricsManager.java|  15 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 727 +--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  21 +-
 .../consumer/internals/CompletedFetchTest.java |  34 +-
 .../clients/consumer/internals/FetcherTest.java|  54 +-
 .../consumer/internals/OffsetFetcherTest.java  |  18 +-
 13 files changed, 1042 insertions(+), 821 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 93ae7c30d0f..5c95c8d4284 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -46,7 +46,7 @@
 
 
+  
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
 
 
 
-
 
 
 
 
+  
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
 
 
@@ -87,13 +85,13 @@
   
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
 
+  
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
 
 
 
 
+  
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/>
 
 
@@ -108,7 +106,7 @@
 
 
 
+  
files="(Sender|Fetcher|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
 
 
@@ -117,7 +115,7 @@
   files="MockAdminClient.java"/>
 
 
+  files="(OffsetFetcher|RequestResponse)Test.java"/>
 
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index d88aa0a6a1c..ee190df50e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -74,6 +74,7 @@ public class CommonClientConfigs {
 
 public static final String CLIENT_RACK_CONFIG = "client.rack";
 public static final String CLIENT_RACK_DOC = "A rack identifier for this 
client. This can be any string value which indicates where this client is 
physically located. It corresponds with the broker config 'broker.rack'";
+public static final String DEFAULT_CLIENT_RACK = "";
 
 public static final String RECONNECT_BACKOFF_MS_CONFIG = 
"reconnect.backoff.ms";
 public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of 
time to wait before attempting to reconnect to a given host. T

[kafka] branch trunk updated (e07cc127e12 -> f79c2a6e041)

2023-03-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from e07cc127e12 MINOR: Fix remote_kraft -> isolated_kraft in kafkatest 
(#13439)
 add f79c2a6e041 MINOR:Incorrect/canonical use of constants in 
AdminClientConfig and StreamsConfigTest (#13427)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/kafka/clients/admin/AdminClientConfig.java   | 4 ++--
 streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)



[kafka] branch trunk updated (df5850274d3 -> 6fae237638e)

2023-03-20 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from df5850274d3 MINOR: Expand use of PartitionAssignment (#13402)
 add 6fae237638e MINOR: Use JUnit-5 extension to enforce strict stubbing 
(#13347)

No new revisions were added by this update.

Summary of changes:
 build.gradle| 1 +
 .../streams/state/internals/metrics/RocksDBMetricsRecorderTest.java | 6 ++
 2 files changed, 7 insertions(+)



[kafka] branch trunk updated: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring (#13301)

2023-03-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new c74c0f2facd KAFKA-14758: Extract inner classes from Fetcher for reuse 
in refactoring (#13301)
c74c0f2facd is described below

commit c74c0f2facde2b392ab745144d6ad520575ab9ef
Author: Kirk True 
AuthorDate: Fri Mar 10 10:17:14 2023 -0800

KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring 
(#13301)

The Fetcher class is used internally by the KafkaConsumer to fetch records 
from the brokers. There is ongoing work to create a new consumer implementation 
with a significantly refactored threading model. The threading refactor work 
requires a similarly refactored Fetcher.

This task includes refactoring Fetcher by extracting out the inner classes 
into top-level (though still in internal) so that those classes can be 
referenced by forthcoming refactored fetch logic.

Reviewers: Philip Nee , Guozhang Wang 

---
 .../kafka/clients/consumer/KafkaConsumer.java  |   4 +-
 .../clients/consumer/internals/CompletedFetch.java | 365 ++
 .../consumer/internals/ConsumerMetrics.java|   4 +-
 .../consumer/internals/FetchMetricsAggregator.java |  95 
 .../consumer/internals/FetchMetricsManager.java| 203 
 ...ricsRegistry.java => FetchMetricsRegistry.java} |   8 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 557 ++---
 .../clients/consumer/internals/SensorBuilder.java  | 115 +
 .../consumer/internals/CompletedFetchTest.java | 304 +++
 .../internals/FetchMetricsManagerTest.java | 171 +++
 .../clients/consumer/internals/FetcherTest.java|   4 +-
 .../consumer/internals/OffsetFetcherTest.java  |   2 +-
 12 files changed, 1300 insertions(+), 532 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3f966121d69..11ac675cb4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -28,7 +28,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetch;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
+import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry;
 import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
@@ -737,7 +737,7 @@ public class KafkaConsumer implements Consumer {
 this.metadata.bootstrap(addresses);
 String metricGrpPrefix = "consumer";
 
-FetcherMetricsRegistry metricsRegistry = new 
FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), 
metricGrpPrefix);
+FetchMetricsRegistry metricsRegistry = new 
FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), 
metricGrpPrefix);
 ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config, time, logContext);
 this.isolationLevel = IsolationLevel.valueOf(
 
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
new file mode 100644
index 000..6a11b846810
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clie

[kafka] branch trunk updated: KAFKA-14753: Improve kafka producer example (#13354)

2023-03-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 4527e54647a KAFKA-14753: Improve kafka producer example (#13354)
4527e54647a is described below

commit 4527e54647a0b8457f7f2b5d804eb65dc4d9d817
Author: Philip Nee 
AuthorDate: Tue Mar 7 16:25:49 2023 -0800

KAFKA-14753: Improve kafka producer example (#13354)

Reviewers: Guozhang Wang 
---
 .../src/main/java/kafka/examples/Producer.java | 65 +++---
 1 file changed, 44 insertions(+), 21 deletions(-)

diff --git a/examples/src/main/java/kafka/examples/Producer.java 
b/examples/src/main/java/kafka/examples/Producer.java
index e649a7862c9..e85fa16060e 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -27,7 +27,13 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
+/**
+ * Demo producer that demonstrate two modes of KafkaProducer.
+ * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
+ * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ */
 public class Producer extends Thread {
 private final KafkaProducer producer;
 private final String topic;
@@ -54,8 +60,8 @@ public class Producer extends Thread {
 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
 }
 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-
 producer = new KafkaProducer<>(props);
+
 this.topic = topic;
 this.isAsync = isAsync;
 this.numRecords = numRecords;
@@ -70,28 +76,45 @@ public class Producer extends Thread {
 public void run() {
 int messageKey = 0;
 int recordsSent = 0;
-while (recordsSent < numRecords) {
-String messageStr = "Message_" + messageKey;
-long startTime = System.currentTimeMillis();
-if (isAsync) { // Send asynchronously
-producer.send(new ProducerRecord<>(topic,
-messageKey,
-messageStr), new DemoCallBack(startTime, messageKey, 
messageStr));
-} else { // Send synchronously
-try {
-producer.send(new ProducerRecord<>(topic,
-messageKey,
-messageStr)).get();
-System.out.println("Sent message: (" + messageKey + ", " + 
messageStr + ")");
-} catch (InterruptedException | ExecutionException e) {
-e.printStackTrace();
-}
+try {
+while (recordsSent < numRecords) {
+final long currentTimeMs = System.currentTimeMillis();
+produceOnce(messageKey, recordsSent, currentTimeMs);
+messageKey += 2;
+recordsSent += 1;
 }
-messageKey += 2;
-recordsSent += 1;
+} catch (Exception e) {
+System.out.println("Producer encountered exception:" + e);
+} finally {
+System.out.println("Producer sent " + numRecords + " records 
successfully");
+this.producer.close();
+latch.countDown();
 }
-System.out.println("Producer sent " + numRecords + " records 
successfully");
-latch.countDown();
+}
+
+private void produceOnce(final int messageKey, final int recordsSent, 
final long currentTimeMs) throws ExecutionException, InterruptedException {
+String messageStr = "Message_" + messageKey;
+
+if (isAsync) { // Send asynchronously
+sendAsync(messageKey, messageStr, currentTimeMs);
+return;
+}
+Future future = send(messageKey, messageStr);
+future.get();
+System.out.println("Sent message: (" + messageKey + ", " + messageStr 
+ ")");
+}
+
+private void sendAsync(final int messageKey, final String messageStr, 
final long currentTimeMs) {
+this.producer.send(new ProducerRecord<>(topic,
+messageKey,
+messageStr),
+new DemoCallBack(currentTimeMs, messageKey, messageStr));
+}
+
+private Future send(final int messageKey, final String 
messageStr) {
+return producer.send(new ProducerRecord<>(topic,
+messageKey,
+messageStr));
 }
 }
 



[kafka] branch trunk updated (6fbe4d85a22 -> b19ae7857b0)

2023-03-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 6fbe4d85a22 KAFKA-14761 Adding integration test for the prototype 
consumer (#13303)
 add b19ae7857b0 KAFKA-14752: Improving the existing consumer examples 
(#13353)

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/kafka/examples/Consumer.java | 29 +++---
 .../src/main/java/kafka/examples/Producer.java |  2 +-
 2 files changed, 26 insertions(+), 5 deletions(-)



[kafka] branch trunk updated (77215eded7b -> 6fbe4d85a22)

2023-03-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 77215eded7b KAFKA-14792: Race condition in LazyIndex.get() (#13359)
 add 6fbe4d85a22 KAFKA-14761 Adding integration test for the prototype 
consumer (#13303)

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/DefaultEventHandler.java|   1 +
 .../consumer/internals/NetworkClientDelegate.java  |  18 ++--
 .../consumer/internals/PrototypeAsyncConsumer.java |  74 +++--
 .../events/ApplicationEventProcessor.java  |   5 +-
 .../consumer/internals/events/EventHandler.java|   3 +-
 .../internals/PrototypeAsyncConsumerTest.java  | 116 +
 .../kafka/api/AbstractConsumerTest.scala   |  10 +-
 .../kafka/api/BaseAsyncConsumerTest.scala  |  47 +
 .../integration/kafka/api/ConsumerBounceTest.scala |  14 +--
 .../kafka/api/IntegrationTestHarness.scala |  21 +++-
 .../kafka/api/PlaintextConsumerTest.scala  |  10 +-
 11 files changed, 234 insertions(+), 85 deletions(-)
 create mode 100644 
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala



[kafka] branch trunk updated (38c409cf33c -> a6d8988179d)

2023-03-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 38c409cf33c KAFKA-14084: SCRAM support in KRaft. (#13114)
 add a6d8988179d MINOR: Clarify docs for Streams config 
max.warmup.replicas. (#13082)

No new revisions were added by this update.

Summary of changes:
 docs/streams/developer-guide/config-streams.html   | 18 ++
 .../java/org/apache/kafka/streams/StreamsConfig.java   |  6 +-
 2 files changed, 19 insertions(+), 5 deletions(-)



[kafka] branch trunk updated (71fa008b456 -> 47450ee064b)

2023-03-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 71fa008b456 KAFKA-14745: Cache the ReplicationPolicy instance in 
MirrorConnectorConfig (#13328)
 add 47450ee064b MINOR: update RocksDBMetricsRecorder test to JUnit5 and 
fix memory leak (#13336)

No new revisions were added by this update.

Summary of changes:
 .../metrics/RocksDBMetricsRecorderTest.java  | 20 
 1 file changed, 12 insertions(+), 8 deletions(-)



[kafka] branch trunk updated (3d9a03cfe82 -> 58429532497)

2023-03-01 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 3d9a03cfe82 MINOR: fix rerun-tests for unit test (#13288)
 add 58429532497 MINOR: Fix flaky tests in DefaultStateUpdaterTest (#13319)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/DefaultStateUpdater.java   |  6 +++---
 .../internals/DefaultStateUpdaterTest.java | 24 --
 .../org/apache/kafka/test/StreamsTestUtils.java|  3 +--
 3 files changed, 17 insertions(+), 16 deletions(-)



[kafka] branch trunk updated (d479d129e0b -> 3d9a03cfe82)

2023-03-01 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from d479d129e0b KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078)
 add 3d9a03cfe82 MINOR: fix rerun-tests for unit test (#13288)

No new revisions were added by this update.

Summary of changes:
 build.gradle | 5 +
 1 file changed, 5 insertions(+)



[kafka] branch trunk updated: KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190)

2023-02-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f7f376f6c16 KAFKA-12639: Exit upon expired timer to prevent tight 
looping (#13190)
f7f376f6c16 is described below

commit f7f376f6c162717e60e143b05fbd12ea2f347e3c
Author: Philip Nee 
AuthorDate: Tue Feb 28 17:36:37 2023 -0800

KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190)

In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be 
retried without proper backoff, due to the expired timer. This is an uncommon 
scenario and possibly only appears during the testing, but I think it makes 
sense to enforce the client to drive the join group via poll.

Reviewers: Guozhang Wang 
---
 .../consumer/internals/AbstractCoordinator.java| 12 +--
 .../internals/AbstractCoordinatorTest.java | 39 ++
 .../internals/ConsumerCoordinatorTest.java |  3 +-
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 558ce2b1169..fc01b14ab08 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -501,13 +501,19 @@ public abstract class AbstractCoordinator implements 
Closeable {
 }
 
 if (exception instanceof UnknownMemberIdException ||
-exception instanceof IllegalGenerationException ||
-exception instanceof RebalanceInProgressException ||
-exception instanceof MemberIdRequiredException)
+exception instanceof IllegalGenerationException ||
+exception instanceof RebalanceInProgressException ||
+exception instanceof MemberIdRequiredException)
 continue;
 else if (!future.isRetriable())
 throw exception;
 
+// We need to return upon expired timer, in case if the 
client.poll returns immediately and the time
+// has elapsed.
+if (timer.isExpired()) {
+return false;
+}
+
 timer.sleep(rebalanceConfig.retryBackoffMs);
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 908dd7e4485..0a2b1e7ef8b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -1561,6 +1561,45 @@ public class AbstractCoordinatorTest {
 }
 }
 
+@Test
+public void testBackoffAndRetryUponRetriableError() {
+this.mockTime = new MockTime();
+long currentTimeMs = System.currentTimeMillis();
+this.mockTime.setCurrentTimeMs(System.currentTimeMillis());
+
+setupCoordinator(); // note: uses 100ms backoff
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+// Retriable Exception
+
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+mockClient.prepareResponse(joinGroupResponse(Errors.NONE)); // Retry 
w/o error
+mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+coordinator.joinGroupIfNeeded(mockTime.timer(REQUEST_TIMEOUT_MS));
+
+assertEquals(100, mockTime.milliseconds() - currentTimeMs, 1);
+}
+
+@Test
+public void testReturnUponRetriableErrorAndExpiredTimer() throws 
InterruptedException {
+setupCoordinator();
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+coordinator.ensureCoordinatorReady(mockTime.timer(0));
+ExecutorService executor = Executors.newFixedThreadPool(1);
+Timer t = mockTime.timer(500);
+try {
+Future attempt = executor.submit(() -> 
coordinator.joinGroupIfNeeded(t));
+mockTime.sleep(500);
+
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+assertFalse(attempt.get());
+} catch (Exception e) {
+fail();
+} finally {
+executor.shutdownNow();
+executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+}
+}
+
 private AtomicBoolean prepareFirstHeartb

[kafka] branch trunk updated: MINOR: update docs of 'replica.socket.receive.buffer.bytes' (#13308)

2023-02-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 30795674615 MINOR: update docs of 
'replica.socket.receive.buffer.bytes' (#13308)
30795674615 is described below

commit 30795674615180af43377c79d106c559102e2522
Author: Chia-Ping Tsai 
AuthorDate: Tue Feb 28 03:29:44 2023 +0800

MINOR: update docs of 'replica.socket.receive.buffer.bytes' (#13308)

Reviewers: Guozhang Wang 
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1130cb5a12a..6f1aa52e8fa 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -888,7 +888,7 @@ object KafkaConfig {
   val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests 
or hasn't consumed up to the leaders log end offset for at least this time," +
   " the leader will remove the follower from isr"
   val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. 
Its value should be at least replica.fetch.wait.max.ms"
-  val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for 
network requests"
+  val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for 
network requests to the leader for replicating data"
   val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to 
fetch for each partition. This is not an absolute maximum, " +
 "if the first record batch in the first non-empty partition of the fetch 
is larger than this value, the record batch will still be returned " +
 "to ensure that progress can be made. The maximum record batch size 
accepted by the broker is defined via " +



[kafka] branch trunk updated (8d32a0f2463 -> 62431dca700)

2023-02-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 8d32a0f2463 [KAFKA-14685] Refactor logic to handle 
OFFSET_MOVED_TO_TIERED_STORAGE error (#13206)
 add 62431dca700 KAFKA-14468: Implement CommitRequestManager to manage the 
commit and autocommit requests (#13021)

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/CommitRequestManager.java   | 238 +
 .../internals/DefaultBackgroundThread.java |  86 +---
 .../clients/consumer/internals/GroupState.java |  89 
 .../consumer/internals/NetworkClientDelegate.java  |  16 +-
 .../consumer/internals/PrototypeAsyncConsumer.java |  37 ++--
 .../clients/consumer/internals/RequestManager.java |   4 +
 .../internals/events/ApplicationEvent.java |   2 +-
 .../events/ApplicationEventProcessor.java  |  47 +++-
 .../internals/events/CommitApplicationEvent.java   |  64 ++
 ...kgroundEvent.java => PollApplicationEvent.java} |  10 +-
 .../internals/CommitRequestManagerTest.java| 139 
 .../internals/DefaultBackgroundThreadTest.java |  41 +++-
 12 files changed, 700 insertions(+), 73 deletions(-)
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupState.java
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
 copy 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{ErrorBackgroundEvent.java
 => PollApplicationEvent.java} (79%)
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java



[kafka] branch trunk updated: KAFKA-10199: Add task updater metrics, part 1 (#13228)

2023-02-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 2fad1652942 KAFKA-10199: Add task updater metrics, part 1 (#13228)
2fad1652942 is described below

commit 2fad1652942226454a44038f2350642817f9f74b
Author: Guozhang Wang 
AuthorDate: Fri Feb 24 10:25:11 2023 -0800

KAFKA-10199: Add task updater metrics, part 1 (#13228)

* Moved pausing-tasks logic out of the commit-interval loop to be on the 
top-level loop, similar to resuming tasks.
* Added thread-level restoration metrics.
* Related unit tests.

Reviewers: Lucas Brutschy , Matthias J. 
Sax 
---
 .../processor/internals/ChangelogReader.java   |  10 +-
 .../processor/internals/DefaultStateUpdater.java   | 230 +++--
 .../processor/internals/StoreChangelogReader.java  |  94 +
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../streams/processor/internals/TaskManager.java   |   4 +-
 .../internals/metrics/StreamsMetricsImpl.java  |   5 +
 .../internals/DefaultStateUpdaterTest.java | 151 --
 .../processor/internals/MockChangelogReader.java   |   8 +-
 .../processor/internals/StreamThreadTest.java  |   2 +-
 9 files changed, 427 insertions(+), 82 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 03199d294ca..1cf8ef628da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -28,8 +28,10 @@ import java.util.Set;
 public interface ChangelogReader extends ChangelogRegister {
 /**
  * Restore all registered state stores by reading from their changelogs
+ *
+ * @return the total number of records restored in this call
  */
-void restore(final Map tasks);
+long restore(final Map tasks);
 
 /**
  * Transit to restore active changelogs mode
@@ -41,6 +43,12 @@ public interface ChangelogReader extends ChangelogRegister {
  */
 void transitToUpdateStandby();
 
+/**
+ * @return true if the reader is in restoring active changelog mode;
+ * false if the reader is in updating standby changelog mode
+ */
+boolean isRestoringActive();
+
 /**
  * @return the changelog partitions that have been completed restoring
  */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index ae6618c304f..5e912c99a5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -16,8 +16,15 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -32,7 +39,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +60,10 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
+
 public class DefaultStateUpdater implements StateUpdater {
 
 private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
@@ -59,14 +72,20 @@ public class DefaultStateUpdater implements StateUpdater {
 private class StateUpdaterThread extends Thread {
 
 private final ChangelogReader changelogReader;
+private final StateUpdaterMetrics updaterMetrics;
 private final AtomicBoolean isRun

[kafka] branch trunk updated: KAFKA-14299: Fix pause and resume with state updater (#13025)

2023-02-21 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 0fc029c6a47 KAFKA-14299: Fix pause and resume with state updater 
(#13025)
0fc029c6a47 is described below

commit 0fc029c6a47a7a930a2b078569de1173cdb547a4
Author: Lucas Brutschy 
AuthorDate: Tue Feb 21 19:17:09 2023 +0100

KAFKA-14299: Fix pause and resume with state updater (#13025)

* Fixes required to make the PauseResumeIntegrationTest pass. It was not 
enabled and it does not pass for the state updater code path.

* Make sure no progress is made on paused topologies. The state updater 
restored one round of polls from the restore
consumer before realizing that a newly added task was already in paused 
state when being added.

* Wake up state updater when tasks are being resumed. If a task is resumed, 
it may be necessary to wake up the state updater from waiting on the 
tasksAndActions condition.

* Make sure that allTasks methods also return the tasks that are currently 
being restored.

* Enable PauseResumeIntegrationTest and upgrade it to JUnit5.

Reviewers: Bruno Cadonna , Guozhang Wang 

---
 .../org/apache/kafka/streams/KafkaStreams.java |   1 +
 .../processor/internals/DefaultStateUpdater.java   |  65 ++
 .../streams/processor/internals/ReadOnlyTask.java  |   2 +-
 .../streams/processor/internals/StateUpdater.java  |   5 ++
 .../streams/processor/internals/StreamThread.java  |   7 +-
 .../streams/processor/internals/TaskManager.java   |  35 +++-
 .../KafkaStreamsNamedTopologyWrapper.java  |   2 +
 .../integration/PauseResumeIntegrationTest.java| 100 -
 .../internals/DefaultStateUpdaterTest.java |   1 +
 .../processor/internals/ReadOnlyTaskTest.java  |   1 +
 .../processor/internals/StreamThreadTest.java  |   4 +-
 .../processor/internals/TaskManagerTest.java   |  32 +++
 12 files changed, 189 insertions(+), 66 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 03b778ab6ec..c05e4c6c1ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1751,6 +1751,7 @@ public class KafkaStreams implements AutoCloseable {
 } else {
 topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY);
 }
+threads.forEach(StreamThread::signalResume);
 }
 
 /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index d96593c5011..ae6618c304f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -116,6 +116,7 @@ public class DefaultStateUpdater implements StateUpdater {
 
 private void runOnce() throws InterruptedException {
 performActionsOnTasks();
+resumeTasks();
 restoreTasks();
 checkAllUpdatingTaskStates(time.milliseconds());
 waitIfAllChangelogsCompletelyRead();
@@ -140,6 +141,16 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 }
 
+private void resumeTasks() {
+if (isTopologyResumed.compareAndSet(true, false)) {
+for (final Task task : pausedTasks.values()) {
+if (!topologyMetadata.isPaused(task.id().topologyName())) {
+resumeTask(task);
+}
+}
+}
+}
+
 private void restoreTasks() {
 try {
 changelogReader.restore(updatingTasks);
@@ -229,7 +240,7 @@ public class DefaultStateUpdater implements StateUpdater {
 if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
 tasksAndActionsLock.lock();
 try {
-while (tasksAndActions.isEmpty()) {
+while (tasksAndActions.isEmpty() && 
!isTopologyResumed.get()) {
 tasksAndActionsCondition.await();
 }
 } finally {
@@ -258,21 +269,39 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 
 private void addTask(final Task task) {
+final TaskId taskId = task.id();
+
+Task existingTask = pausedTasks.get(taskId);
+if (existingTask != null) {
+throw new IllegalStateException(
+(existingTask.isActive() ? "Active" : "

[kafka] branch trunk updated (38e43116222 -> 7e149990bdd)

2023-02-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 38e43116222 MINOR: Fix PluginInfoTest for Connect (#13266)
 add 7e149990bdd KAFKA-14717 KafkaStreams can' get running if the rebalance 
happens be… (#13248)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/KafkaStreams.java |  6 +++-
 .../streams/processor/internals/StreamThread.java  |  4 +++
 .../apache/kafka/streams/KafkaStreamsWrapper.java  |  6 
 .../integration/AdjustStreamThreadCountTest.java   | 34 ++
 4 files changed, 49 insertions(+), 1 deletion(-)



[kafka] branch trunk updated: KAFKA-14253 - More informative logging (#13253)

2023-02-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 82d5720aae7 KAFKA-14253 - More informative logging (#13253)
82d5720aae7 is described below

commit 82d5720aae78c9e17606c8345dfc208557f9a8f2
Author: Philip Nee 
AuthorDate: Thu Feb 16 16:54:50 2023 -0800

KAFKA-14253 - More informative logging (#13253)

Includes 2 requirements from the ticket:
* Include the number of members in the group (I.e., "15 members 
participating" and "to 15 clients as")
* Sort the member ids (to help compare the membership and assignment across 
rebalances)

    Reviewers: Guozhang Wang 
---
 .../internals/StreamsPartitionAssignor.java| 22 ++
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1875f57b649..46c1e41e6c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-
+import static java.util.Map.Entry.comparingByKey;
 import static java.util.UUID.randomUUID;
-
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 final boolean lagComputationSuccessful =
 populateClientStatesMap(clientStates, clientMetadataMap, 
taskForPartition, changelogTopics);
 
-log.info("All members participating in this rebalance: \n{}.",
- clientStates.entrySet().stream()
- .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
- .collect(Collectors.joining(Utils.NL)));
+log.info("{} members participating in this rebalance: \n{}.",
+clientStates.size(),
+clientStates.entrySet().stream()
+.sorted(comparingByKey())
+.map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
+.collect(Collectors.joining(Utils.NL)));
 
 final Set allTasks = partitionsForTask.keySet();
 statefulTasks.addAll(changelogTopics.statefulTaskIds());
@@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf

statefulTasks,

assignmentConfigs);
 
-log.info("Assigned tasks {} including stateful {} to clients as: 
\n{}.",
-allTasks, statefulTasks, clientStates.entrySet().stream()
+log.info("{} assigned tasks {} including stateful {} to {} clients as: 
\n{}.",
+allTasks.size(),
+allTasks,
+statefulTasks,
+clientStates.size(),
+clientStates.entrySet().stream()
+.sorted(comparingByKey())
 .map(entry -> entry.getKey() + "=" + 
entry.getValue().currentAssignment())
 .collect(Collectors.joining(Utils.NL)));
 



[kafka] branch trunk updated: KAFKA-14650: Synchronize access to tasks inside task manager (#13167)

2023-02-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 083e11a22ca KAFKA-14650: Synchronize access to tasks inside task 
manager (#13167)
083e11a22ca is described below

commit 083e11a22ca9966ed010acdd5705351ab4300b52
Author: Guozhang Wang 
AuthorDate: Thu Feb 9 10:33:19 2023 -0800

KAFKA-14650: Synchronize access to tasks inside task manager (#13167)

1. The major fix: synchronize access to tasks inside task manager, this is 
a fix of a regression introduced in #12397
2. Clarify on func names of StreamThread that maybe triggered outside the 
StreamThread.
3. Minor cleanups.

Reviewers: Lucas Brutschy 
---
 .../org/apache/kafka/streams/KafkaStreams.java | 16 ++---
 .../streams/processor/internals/StreamThread.java  | 24 
 .../streams/processor/internals/TaskManager.java   |  6 +
 .../kafka/streams/processor/internals/Tasks.java   | 22 +-
 .../KafkaStreamsNamedTopologyWrapper.java  |  2 +-
 .../internals/StreamThreadStateStoreProvider.java  |  2 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  5 ++---
 .../processor/internals/StreamThreadTest.java  | 26 +++---
 .../StreamThreadStateStoreProviderTest.java|  4 ++--
 9 files changed, 59 insertions(+), 48 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bc3a1243ce2..ee9f57b0680 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1826,7 +1826,7 @@ public class KafkaStreams implements AutoCloseable {
  */
 public Map> allLocalStorePartitionLags() {
 final List allTasks = new ArrayList<>();
-processStreamThread(thread -> 
allTasks.addAll(thread.allTasks().values()));
+processStreamThread(thread -> 
allTasks.addAll(thread.readyOnlyAllTasks()));
 return allLocalStorePartitionLags(allTasks);
 }
 
@@ -1917,21 +1917,19 @@ public class KafkaStreams implements AutoCloseable {
 );
 } else {
 for (final StreamThread thread : threads) {
-final Map tasks = thread.allTasks();
-for (final Entry entry : tasks.entrySet()) {
+final Set tasks = thread.readyOnlyAllTasks();
+for (final Task task : tasks) {
 
-final TaskId taskId = entry.getKey();
+final TaskId taskId = task.id();
 final int partition = taskId.partition();
-if (request.isAllPartitions()
-|| request.getPartitions().contains(partition)) {
-final Task task = entry.getValue();
+if (request.isAllPartitions() || 
request.getPartitions().contains(partition)) {
 final StateStore store = task.getStore(storeName);
 if (store != null) {
 final StreamThread.State state = thread.state();
 final boolean active = task.isActive();
 if (request.isRequireActive()
-&& (state != StreamThread.State.RUNNING
-|| !active)) {
+&& (state != StreamThread.State.RUNNING || 
!active)) {
+
 result.addResult(
 partition,
 QueryResult.forFailure(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2d433eb3c82..d4be6a83af9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -60,7 +60,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -1269,16 +1268,23 @@ public class StreamThread extends Thread {
 );
 }
 
-public Map activeTaskMap() {
-return taskManager.activeTaskMap();
-}
-
-public List activeTasks() {
-return taskManager.activeTaskIterable();
+/**
+ * Getting the list of current active tasks of the thread;
+ * Note that the returned list may be used by other thread than the 
StreamThread itself,
+ * and hence need to be read-only
+ */
+public Set readOnlyActiveTasks() {
+ 

[kafka] branch trunk updated: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)

2023-02-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 788793dee6f KAFKA-10575: Add onRestoreSuspsnded to 
StateRestoreListener (#13179)
788793dee6f is described below

commit 788793dee6fa5d7ba5cb7d756b72c7d043dc8789
Author: Guozhang Wang 
AuthorDate: Tue Feb 7 11:33:09 2023 -0800

KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)

1. Add the new API (default impl is empty) to StateRestoreListener.
2. Update related unit tests

Reviewers: Lucas Brutschy , Matthias J. 
Sax 
---
 .../streams/processor/StateRestoreListener.java|  22 
 .../processor/internals/StoreChangelogReader.java  |  15 +++
 .../internals/StoreChangelogReaderTest.java| 133 +++--
 .../kafka/test/MockStateRestoreListener.java   |  10 ++
 4 files changed, 173 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index 6ba794f187b..006cc58cd43 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -22,13 +22,16 @@ import org.apache.kafka.common.TopicPartition;
 /**
  * Class for listening to various states of the restoration process of a 
StateStore.
  *
+ * 
  * When calling {@link 
org.apache.kafka.streams.KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener)}
  * the passed instance is expected to be stateless since the {@code 
StateRestoreListener} is shared
  * across all {@link 
org.apache.kafka.streams.processor.internals.StreamThread} instances.
  *
+ * 
  * Users desiring stateful operations will need to provide synchronization 
internally in
  * the {@code StateRestorerListener} implementation.
  *
+ * 
  * Note that this listener is only registered at the per-client level and 
users can base on the {@code storeName}
  * parameter to define specific monitoring for different {@link StateStore}s. 
There is another
  * {@link StateRestoreCallback} interface which is registered via the
@@ -37,6 +40,12 @@ import org.apache.kafka.common.TopicPartition;
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that the update process of standby tasks is not monitored via 
this interface, since a standby task does
+ * note actually restore state, but keeps updating its state from the 
changelogs written by the active task
+ * which does not ever finish.
+ *
+ * 
  * Incremental updates are exposed so users can estimate how much progress has 
been made.
  */
 public interface StateRestoreListener {
@@ -85,4 +94,17 @@ public interface StateRestoreListener {
   final String storeName,
   final long totalRestored);
 
+/**
+ * Method called when restoring the {@link StateStore} is suspended due to 
the task being migrated out of the host.
+ * If the migrated task is recycled or re-assigned back to the current 
host, another
+ * {@link #onRestoreStart(TopicPartition, String, long, long)} would be 
called.
+ *
+ * @param topicPartition the {@link TopicPartition} containing the values 
to restore
+ * @param storeName the name of the store just restored
+ * @param totalRestored the total number of records restored for this 
TopicPartition before being paused
+ */
+default void onRestoreSuspended(final TopicPartition topicPartition,
+final String storeName,
+final long totalRestored) {
+}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 874f1993c19..be580f3575c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -986,8 +986,23 @@ public class StoreChangelogReader implements 
ChangelogReader {
 for (final TopicPartition partition : revokedChangelogs) {
 final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
 if (changelogMetadata != null) {
+// if the changelog is still in REGISTERED, it means it has 
not initialized and started
+// restoring yet, and hence we should not try to remove the 
changelog partition
 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED

[kafka] branch trunk updated (a3cf8b54e03 -> 3cf13064cc9)

2023-02-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from a3cf8b54e03 KAFKA-14455: Kafka Connect create and update REST APIs 
should surface failures while writing to the config topic (#12984)
 add 3cf13064cc9 Replace EasyMock and PowerMock with Mockito - 
TimeOrderedWindowStoreTest (#12777)

No new revisions were added by this update.

Summary of changes:
 .../internals/TimeOrderedWindowStoreTest.java  | 102 ++---
 1 file changed, 46 insertions(+), 56 deletions(-)



[kafka] branch trunk updated (6c98544a964 -> eb7f490159c)

2023-01-31 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 6c98544a964 KAFKA-14491: [2/N] Refactor RocksDB store open iterator 
management (#13142)
 add eb7f490159c chore: Fix scaladoc warnings (#13164)

No new revisions were added by this update.

Summary of changes:
 build.gradle   |  4 +++
 .../kafka/streams/scala/kstream/Branched.scala |  4 +--
 .../streams/scala/kstream/BranchedKStream.scala| 12 +
 .../kafka/streams/scala/kstream/Consumed.scala |  6 ++---
 .../kafka/streams/scala/kstream/Grouped.scala  |  8 +++---
 .../kafka/streams/scala/kstream/Joined.scala   |  8 +++---
 .../kafka/streams/scala/kstream/KStream.scala  | 14 +-
 .../kafka/streams/scala/kstream/KTable.scala   | 10 +---
 .../kafka/streams/scala/kstream/Materialized.scala | 30 --
 .../kafka/streams/scala/kstream/Produced.scala |  6 ++---
 .../streams/scala/kstream/Repartitioned.scala  |  6 ++---
 .../kafka/streams/scala/kstream/StreamJoined.scala | 12 -
 12 files changed, 66 insertions(+), 54 deletions(-)



[kafka] branch trunk updated (bc1ce9f0f1b -> 1d0585563b4)

2023-01-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from bc1ce9f0f1b KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially 
leaks secrets in logging (#13119)
 add 1d0585563b4 MINOR: fix flaky DefaultStateUpdaterTest (#13160)

No new revisions were added by this update.

Summary of changes:
 streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)



[kafka] branch trunk updated (123e0e9ca9e -> ca80502ebe9)

2023-01-20 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 123e0e9ca9e MINOR: fix warnings in Streams javadocs (#13132)
 add ca80502ebe9 Update outdated documentation. (#13139)

No new revisions were added by this update.

Summary of changes:
 .../clients/consumer/internals/SubscriptionState.java   | 17 -
 1 file changed, 8 insertions(+), 9 deletions(-)



[kafka] branch trunk updated: Kafka Streams Threading P3: TaskManager Impl (#12754)

2022-10-14 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 55a3a95b7a1 Kafka Streams Threading P3: TaskManager Impl (#12754)
55a3a95b7a1 is described below

commit 55a3a95b7a1f61c2a93a6341d3e091f68b048234
Author: Guozhang Wang 
AuthorDate: Fri Oct 14 16:10:57 2022 -0700

Kafka Streams Threading P3: TaskManager Impl (#12754)

0. Add name to task executors.
1. DefaultTaskManager implementation, for interacting with the 
TaskExecutors and support add/remove/lock APIs.
2. Related unit tests.
---
 .../internals/tasks/DefaultTaskExecutor.java   |  13 +-
 .../internals/tasks/DefaultTaskManager.java| 246 +
 .../processor/internals/tasks/TaskExecutor.java|   7 +-
 .../internals/tasks/TaskExecutorCreator.java   |  24 ++
 .../internals/tasks/DefaultTaskExecutorTest.java   |   2 +-
 .../internals/tasks/DefaultTaskManagerTest.java| 189 
 6 files changed, 476 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
index 5ad0950fc51..c03384f4daa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.internals.DefaultStateUpdater;
 import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.slf4j.Logger;
@@ -45,7 +44,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
 super(name);
 final String logPrefix = String.format("%s ", name);
 final LogContext logContext = new LogContext(logPrefix);
-log = logContext.logger(DefaultStateUpdater.class);
+log = logContext.logger(DefaultTaskExecutor.class);
 }
 
 @Override
@@ -102,6 +101,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
 }
 
 private final Time time;
+private final String name;
 private final TaskManager taskManager;
 
 private StreamTask currentTask = null;
@@ -109,15 +109,22 @@ public class DefaultTaskExecutor implements TaskExecutor {
 private CountDownLatch shutdownGate;
 
 public DefaultTaskExecutor(final TaskManager taskManager,
+   final String name,
final Time time) {
 this.time = time;
+this.name = name;
 this.taskManager = taskManager;
 }
 
+@Override
+public String name() {
+return name;
+}
+
 @Override
 public void start() {
 if (taskExecutorThread == null) {
-taskExecutorThread = new TaskExecutorThread("task-executor");
+taskExecutorThread = new TaskExecutorThread(name);
 taskExecutorThread.start();
 shutdownGate = new CountDownLatch(1);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
new file mode 100644
index 000..3f97de85ceb
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.LogContext;
+i

[kafka] branch trunk updated (78b4ba7d85a -> dfb59296654)

2022-10-14 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 78b4ba7d85a MINOR; Add kraft controller log level in log4j prop 
(#12707)
 add dfb59296654 Kafka Streams Threading P2: Skeleton TaskExecutor Impl 
(#12744)

No new revisions were added by this update.

Summary of changes:
 .../internals/tasks/DefaultTaskExecutor.java   | 158 +
 .../processor/internals/tasks/TaskExecutor.java|  15 +-
 .../processor/internals/tasks/TaskManager.java |   5 +-
 .../internals/tasks/DefaultTaskExecutorTest.java   | 107 ++
 4 files changed, 281 insertions(+), 4 deletions(-)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java



[kafka] branch trunk updated: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor (#12737)

2022-10-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 69059b5f288 Kafka Streams Threading P1: Add Interface for new 
TaskManager and TaskExecutor (#12737)
69059b5f288 is described below

commit 69059b5f288ef5b6496953ba1d77b2570d8b3142
Author: Guozhang Wang 
AuthorDate: Wed Oct 12 16:33:13 2022 -0700

Kafka Streams Threading P1: Add Interface for new TaskManager and 
TaskExecutor (#12737)

The interfaces (and their future impls) are added under the 
processor/internals/tasks package, to distinguish with the existing old classes:

1. TaskExecutor is the interface for a processor thread. It takes at most 
one task to process at a given time from the task manager. When being asked 
from the task manager to un-assign the current processing task, it will stop 
processing and give the task back to task manager.
2. TaskManager schedules all the active tasks to assign to TaskExecutors. 
Specifically: 1) when a task executor ask it for an unassigned task to process 
(assignNextTask), it will return the available task based on its scheduling 
algorithm. 2) when the task manager decides to commit (all) tasks, or when a 
rebalance event requires it to modify the maintained active tasks (via 
onAssignment), it will lock all the tasks that are going to be closed / 
committed, asking the TaskExecutor to gi [...]

Reviewers: John Roesler , Anna Sophie Blee-Goldman 

---
 .../processor/internals/tasks/TaskExecutor.java|  57 
 .../processor/internals/tasks/TaskManager.java | 100 +
 2 files changed, 157 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
new file mode 100644
index 000..ead1fb8179e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.time.Duration;
+
+public interface TaskExecutor {
+
+/**
+ * Starts the task processor.
+ */
+void start();
+
+/**
+ * Shuts down the task processor updater.
+ *
+ * @param timeout duration how long to wait until the state updater is 
shut down
+ *
+ * @throws
+ * org.apache.kafka.streams.errors.StreamsException if the state 
updater thread cannot shutdown within the timeout
+ */
+void shutdown(final Duration timeout);
+
+/**
+ * Get the current assigned processing task. The task returned is 
read-only and cannot be modified.
+ *
+ * @return the current processing task
+ */
+ReadOnlyTask currentTask();
+
+/**
+ * Unassign the current processing task from the task processor and give 
it back to the state manager.
+ *
+ * The paused task must be flushed since it may be committed or closed by 
the task manager next.
+ *
+ * This method does not block, instead a future is returned.
+ */
+KafkaFuture unassign();
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
new file mode 100644
index 000..e9929714aca
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "L

[kafka] branch trunk updated: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not roundtrip input with ByteBufferDeserializer#deserialize(String, byte[]) (#12704)

2022-09-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 496ae054c2d Fix ByteBufferSerializer#serialize(String, ByteBuffer) not 
roundtrip input with ByteBufferDeserializer#deserialize(String, byte[]) (#12704)
496ae054c2d is described below

commit 496ae054c2d43c0905167745bfb2f4a0725e9fc2
Author: LinShunKang 
AuthorDate: Fri Sep 30 21:45:18 2022 +0800

Fix ByteBufferSerializer#serialize(String, ByteBuffer) not roundtrip input 
with ByteBufferDeserializer#deserialize(String, byte[]) (#12704)

Reviewers: Guozhang Wang 
---
 .../apache/kafka/common/serialization/ByteBufferSerializer.java   | 8 +++-
 .../org/apache/kafka/common/serialization/SerializationTest.java  | 4 +++-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index 5987688759e..06b66a62cb0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -21,8 +21,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 
 /**
- * ByteBufferSerializer will not change ByteBuffer's mark, position and limit.
- * And do not need to flip before call serialize(String, ByteBuffer). 
For example:
+ * Do not need to flip before call serialize(String, ByteBuffer). For 
example:
  *
  * 
  * 
@@ -48,8 +47,7 @@ public class ByteBufferSerializer implements 
Serializer {
 }
 }
 
-final ByteBuffer copyData = data.asReadOnlyBuffer();
-copyData.flip();
-return Utils.toArray(copyData);
+data.flip();
+return Utils.toArray(data);
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index eb1fee3943f..a0b67a03d42 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -50,7 +50,9 @@ public class SerializationTest {
 put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f));
 put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d));
 put(byte[].class, Arrays.asList("my string".getBytes()));
-put(ByteBuffer.class, 
Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(;
+put(ByteBuffer.class, Arrays.asList(ByteBuffer.wrap("my 
string".getBytes()),
+ByteBuffer.allocate(10).put("my string".getBytes()),
+ByteBuffer.allocateDirect(10).put("my 
string".getBytes(;
 put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes(;
 put(UUID.class, Arrays.asList(UUID.randomUUID()));
 }



[kafka] branch trunk updated: KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683)

2022-09-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 51dbd175b08 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, 
ByteBuffer) not compatible with offsets (#12683)
51dbd175b08 is described below

commit 51dbd175b08e78aeca03d6752847aa5f23c98659
Author: LinShunKang 
AuthorDate: Fri Sep 30 01:59:47 2022 +0800

KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not 
compatible with offsets (#12683)

Reviewers: Guozhang Wang 
---
 .../common/serialization/ByteBufferSerializer.java | 31 --
 .../common/serialization/SerializationTest.java| 19 +
 2 files changed, 42 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index 9fb12544e0f..5987688759e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -16,25 +16,40 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.utils.Utils;
+
 import java.nio.ByteBuffer;
 
+/**
+ * ByteBufferSerializer will not change ByteBuffer's mark, position and limit.
+ * And do not need to flip before call serialize(String, ByteBuffer). 
For example:
+ *
+ * 
+ * 
+ * ByteBufferSerializer serializer = ...; // Create Serializer
+ * ByteBuffer buffer = ...;   // Allocate ByteBuffer
+ * buffer.put(data);  // Put data into buffer, do not need 
to flip
+ * serializer.serialize(topic, buffer);   // Serialize buffer
+ * 
+ * 
+ */
 public class ByteBufferSerializer implements Serializer {
+
+@Override
 public byte[] serialize(String topic, ByteBuffer data) {
-if (data == null)
+if (data == null) {
 return null;
-
-data.rewind();
+}
 
 if (data.hasArray()) {
-byte[] arr = data.array();
+final byte[] arr = data.array();
 if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
 return arr;
 }
 }
 
-byte[] ret = new byte[data.remaining()];
-data.get(ret, 0, ret.length);
-data.rewind();
-return ret;
+final ByteBuffer copyData = data.asReadOnlyBuffer();
+copyData.flip();
+return Utils.toArray(copyData);
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 85c09dd17ae..eb1fee3943f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Stack;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -368,4 +370,21 @@ public class SerializationTest {
 
 return Serdes.serdeFrom(serializer, deserializer);
 }
+
+@Test
+public void testByteBufferSerializer() {
+final byte[] bytes = "Hello".getBytes(UTF_8);
+final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 
1).put(bytes);
+final ByteBuffer heapBuffer1 = 
ByteBuffer.allocate(bytes.length).put(bytes);
+final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+final ByteBuffer directBuffer0 = 
ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
+final ByteBuffer directBuffer1 = 
ByteBuffer.allocateDirect(bytes.length).put(bytes);
+try (final ByteBufferSerializer serializer = new 
ByteBufferSerializer()) {
+assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
+assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
+assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
+assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer0));
+assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer1));
+}
+}
 }



[kafka] branch trunk updated (d2f900b055d -> d62a42df2e2)

2022-09-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from d2f900b055d MINOR: Small update docs/design.html grammar and typo 
(#12691)
 add d62a42df2e2 KAFKA-10199: Integrate Topology Pause/Resume with 
StateUpdater (#12659)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/DefaultStateUpdater.java   | 109 -
 .../streams/processor/internals/StateUpdater.java  |  42 ++--
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../streams/processor/internals/TaskAndAction.java |  16 +--
 .../internals/DefaultStateUpdaterTest.java |  75 +++---
 .../processor/internals/TaskAndActionTest.java |  40 
 .../org/apache/kafka/test/StreamsTestUtils.java|  19 
 7 files changed, 110 insertions(+), 196 deletions(-)



[kafka] branch trunk updated (7496e624341 -> b0ace180359)

2022-09-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 7496e624341 KAFKA-14259: BrokerRegistration#toString throws an 
exception, terminating metadata replay (#12681)
 add b0ace180359 KAFKA-14239: Merge StateRestorationIntegrationTest into 
RestoreIntegrationTest (#12670)

No new revisions were added by this update.

Summary of changes:
 .../integration/RestoreIntegrationTest.java| 106 +---
 .../StateRestorationIntegrationTest.java   | 138 -
 2 files changed, 88 insertions(+), 156 deletions(-)
 delete mode 100644 
streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java



[kafka] branch trunk updated (7ec10ce19a -> 8380d2edf4)

2022-09-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 7ec10ce19a HOTFIX: fix PriorityQueue iteration to assign warmups in 
priority order (#12585)
 add 8380d2edf4 KAFKA-10199: Handle exceptions from state updater (#12519)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/DefaultStateUpdater.java   |  10 ++
 .../streams/processor/internals/TaskManager.java   | 191 +
 .../kafka/streams/processor/internals/Tasks.java   |  53 ++
 .../streams/processor/internals/TasksRegistry.java |   6 +-
 .../processor/internals/TaskManagerTest.java   |  95 +-
 5 files changed, 232 insertions(+), 123 deletions(-)



[kafka] branch trunk updated (8d665c42e23 -> 63e6fdc9c42)

2022-08-25 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 8d665c42e23 MINOR: Small cleanups in integration.kafka tests (#12480)
 add 63e6fdc9c42 MINOR: Improve javadocs for offset retention (#12552)

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala | 8 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala | 7 +--
 2 files changed, 11 insertions(+), 4 deletions(-)



[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revoked and lost partitions (#12547)

2022-08-22 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new add4ca6c7f KAFKA-10199: Remove tasks from state updater on revoked and 
lost partitions (#12547)
add4ca6c7f is described below

commit add4ca6c7f1b289477aa7d6e918f5d22b78088fe
Author: Bruno Cadonna 
AuthorDate: Mon Aug 22 20:50:50 2022 +0200

KAFKA-10199: Remove tasks from state updater on revoked and lost partitions 
(#12547)

Removes tasks from the state updater when the input partitions of the tasks 
are revoked or partitions are lost during a rebalance.

Reviewers: Guozhang Wang 
---
 .../streams/processor/internals/TaskManager.java   |  42 +++-
 .../kafka/streams/processor/internals/Tasks.java   |  19 +++-
 .../processor/internals/TaskManagerTest.java   | 116 -
 3 files changed, 165 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 03c36b0daf..478b783d68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
 standbyTasksToCreate.remove(taskId);
 } else {
 stateUpdater.remove(taskId);
-tasks.addPendingTaskToClose(taskId);
+tasks.addPendingTaskToCloseClean(taskId);
 }
 }
 }
@@ -692,7 +692,7 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(taskId, e);
 }
-} else if (tasks.removePendingTaskToClose(task.id())) {
+} else if (tasks.removePendingTaskToCloseClean(task.id())) {
 try {
 task.suspend();
 task.closeClean();
@@ -710,6 +710,8 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(task.id(), e);
 }
+} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
+tasksToCloseDirty.add(task);
 } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
 stateUpdater.add(task);
@@ -755,6 +757,8 @@ public class TaskManager {
 }
 }
 
+removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
+
 if (!remainingRevokedPartitions.isEmpty()) {
 log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
   + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
@@ -840,6 +844,20 @@ public class TaskManager {
 }
 }
 
+private void removeRevokedTasksFromStateUpdater(final Set 
remainingRevokedPartitions) {
+if (stateUpdater != null) {
+for (final Task restoringTask : stateUpdater.getTasks()) {
+if (restoringTask.isActive()) {
+if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+tasks.addPendingTaskToCloseClean(restoringTask.id());
+stateUpdater.remove(restoringTask.id());
+
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
+}
+}
+}
+}
+}
+
 private void prepareCommitAndAddOffsetsToMap(final Set 
tasksToPrepare,
  final Map> consumedOffsetsPerTask) {
 for (final Task task : tasksToPrepare) {
@@ -867,6 +885,15 @@ public class TaskManager {
 void handleLostAll() {
 log.debug("Closing lost active tasks as zombies.");
 
+closeRunningTasksDirty();
+removeLostTasksFromStateUpdater();
+
+if (processingMode == EXACTLY_ONCE_V2) {
+activeTaskCreator.reInitializeThreadProducer();
+}
+}
+
+private void closeRunningTasksDirty() {
 final Set allTask = tasks.allTasks();
 for (final Task task : allTask) {
 // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -875,9 +902,16 @@ public class TaskManager {
 closeTaskDirty(task);
 }
 }
+}
 
-if (processingMode == EXACTLY_ONCE_V2) {
-activeTaskCreator.reInitializeThreadProducer();
+private void removeLostTasksFromStateUpdater() {
+if (stateUpdater != nul

[kafka] branch trunk updated: MINOR: Improve KafkaProducer Javadocs (#12537)

2022-08-19 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5d32f24cc3 MINOR: Improve KafkaProducer Javadocs (#12537)
5d32f24cc3 is described below

commit 5d32f24cc3597760d3b846647e6a19dddc6e3d71
Author: Guozhang Wang 
AuthorDate: Fri Aug 19 10:09:48 2022 -0700

MINOR: Improve KafkaProducer Javadocs (#12537)

While reviewing KIP-588 and KIP-691 I went through the exception throwing 
behavior and wanted to improve the related javadocs a little bit.

Reviewers: John Roesler 
---
 .../kafka/clients/producer/KafkaProducer.java  | 22 +++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 2d5c8994b4..ec8b8725c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -675,6 +675,11 @@ public class KafkaProducer implements Producer 
{
  * and should also not commit offsets manually (via {@link 
KafkaConsumer#commitSync(Map) sync} or
  * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} 
commits).
  *
+ * 
+ * This method is a blocking call that waits until the request has been 
received and acknowledged by the consumer group
+ * coordinator; but the offsets are not considered as committed until the 
transaction itself is successfully committed later (via
+ * the {@link #commitTransaction()} call).
+ *
  * @throws IllegalStateException if no transactional.id has been 
configured, no transaction has been started
  * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
@@ -685,6 +690,7 @@ public class KafkaProducer implements Producer {
  * transactional.id is not authorized, or the consumer group id is 
not authorized.
  * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if 
the producer has attempted to produce with an old epoch
  * to the partition leader. See the exception for more details
+ * @throws TimeoutException if the time taken for sending the offsets has 
surpassed max.block.ms.
  * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
  * other unexpected error
  *
@@ -711,6 +717,11 @@ public class KafkaProducer implements Producer 
{
  * requires the brokers to be on version 2.5 or newer to understand.
  *
  * 
+ * This method is a blocking call that waits until the request has been 
received and acknowledged by the consumer group
+ * coordinator; but the offsets are not considered as committed until the 
transaction itself is successfully committed later (via
+ * the {@link #commitTransaction()} call).
+ *
+ * 
  * Note, that the consumer should have {@code enable.auto.commit=false} 
and should
  * also not commit offsets manually (via {@link 
KafkaConsumer#commitSync(Map) sync} or
  * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} 
commits).
@@ -735,7 +746,7 @@ public class KafkaProducer implements Producer {
  * to the partition leader. See the exception for more details
  * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
  * other unexpected error
- * @throws TimeoutException if the time taken for sending offsets has 
surpassed max.block.ms.
+ * @throws TimeoutException if the time taken for sending the offsets has 
surpassed max.block.ms.
  * @throws InterruptException if the thread is interrupted while blocked
  */
 public void sendOffsetsToTransaction(Map offsets,
@@ -765,7 +776,10 @@ public class KafkaProducer implements Producer 
{
  * Note that exceptions thrown by callbacks are ignored; the producer 
proceeds to commit the transaction in any case.
  * 
  * Note that this method will raise {@link TimeoutException} if the 
transaction cannot be committed before expiration
- * of {@code max.block.ms}. Additionally, it will raise {@link 
InterruptException} if interrupted.
+ * of {@code max.block.ms}, but this does not mean the request did not 
actually reach the broker. In fact, it only indicates
+ * that we cannot get the acknowledgement response in time, so it's up to 
the application's logic
+ * to decide how to handle time outs.
+ * Additionally, it will raise {@link InterruptException} if interrupted.
  * It is safe to retry

[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revocation (#12520)

2022-08-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation 
(#12520)
b47c4d8598 is described below

commit b47c4d859805068de6a8fe8de3bda5e7a21132e2
Author: Bruno Cadonna 
AuthorDate: Wed Aug 17 20:13:34 2022 +0200

KAFKA-10199: Remove tasks from state updater on revocation (#12520)

Removes tasks from the state updater when the input partitions of the tasks 
are revoked during a rebalance.

Reviewers: Guozhang Wang 
---
 .../streams/processor/internals/TaskManager.java   | 16 +
 .../processor/internals/TaskManagerTest.java   | 81 ++
 2 files changed, 97 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4bba28a3f3..bab05a5184 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -757,6 +757,8 @@ public class TaskManager {
 }
 }
 
+removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
+
 if (!remainingRevokedPartitions.isEmpty()) {
 log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
   + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
@@ -842,6 +844,20 @@ public class TaskManager {
 }
 }
 
+private void removeRevokedTasksFromStateUpdater(final Set 
remainingRevokedPartitions) {
+if (stateUpdater != null) {
+for (final Task restoringTask : stateUpdater.getTasks()) {
+if (restoringTask.isActive()) {
+if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+tasks.addPendingTaskToClose(restoringTask.id());
+stateUpdater.remove(restoringTask.id());
+
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
+}
+}
+}
+}
+}
+
 private void prepareCommitAndAddOffsetsToMap(final Set 
tasksToPrepare,
  final Map> consumedOffsetsPerTask) {
 for (final Task task : tasksToPrepare) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 133541bfac..ff52ad5ae9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -751,6 +751,87 @@ public class TaskManagerTest {
 assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
 }
 
+@Test
+public void 
shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId00Partitions).build();
+final TaskManager taskManager = setupForRevocation(mkSet(task), 
mkSet(task));
+
+taskManager.handleRevocation(taskId00Partitions);
+
+Mockito.verify(stateUpdater).remove(task.id());
+
+taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+Mockito.verify(task).closeClean();
+}
+
+public void 
shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId00Partitions).build();
+final StreamTask task2 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId01Partitions).build();
+final TaskManager taskManager = setupForRevocation(mkSet(task1, 
task2), mkSet(task1, task2));
+
+taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, 
taskId01Partitions));
+
+Mockito.verify(stateUpdater).remove(task1.id());
+Mockito.verify(stateUpdater).remove(task2.id());
+
+taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+Mockito.verify(task1).closeClean();
+Mockito.verify(task2).closeClean();
+}
+
+@Test
+public void 
shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromS

[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on partition lost (#12521)

2022-08-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 9f20f89953 KAFKA-10199: Remove tasks from state updater on partition 
lost (#12521)
9f20f89953 is described below

commit 9f20f8995399d9e03f518f7b9c8be2bffb2fdcfc
Author: Bruno Cadonna 
AuthorDate: Wed Aug 17 20:12:30 2022 +0200

KAFKA-10199: Remove tasks from state updater on partition lost (#12521)

Removes tasks from the state updater when the input partitions of the tasks 
are lost during a rebalance.

Reviewers: Guozhang Wang 
---
 .../streams/processor/internals/TaskManager.java   | 26 ++--
 .../kafka/streams/processor/internals/Tasks.java   | 19 ++---
 .../processor/internals/TaskManagerTest.java   | 48 --
 3 files changed, 81 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 03c36b0daf..4bba28a3f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
 standbyTasksToCreate.remove(taskId);
 } else {
 stateUpdater.remove(taskId);
-tasks.addPendingTaskToClose(taskId);
+tasks.addPendingTaskToCloseClean(taskId);
 }
 }
 }
@@ -692,7 +692,7 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(taskId, e);
 }
-} else if (tasks.removePendingTaskToClose(task.id())) {
+} else if (tasks.removePendingTaskToCloseClean(task.id())) {
 try {
 task.suspend();
 task.closeClean();
@@ -710,6 +710,8 @@ public class TaskManager {
 
 taskExceptions.putIfAbsent(task.id(), e);
 }
+} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
+tasksToCloseDirty.add(task);
 } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
 stateUpdater.add(task);
@@ -867,6 +869,15 @@ public class TaskManager {
 void handleLostAll() {
 log.debug("Closing lost active tasks as zombies.");
 
+closeRunningTasksDirty();
+removeLostTasksFromStateUpdater();
+
+if (processingMode == EXACTLY_ONCE_V2) {
+activeTaskCreator.reInitializeThreadProducer();
+}
+}
+
+private void closeRunningTasksDirty() {
 final Set allTask = tasks.allTasks();
 for (final Task task : allTask) {
 // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -875,9 +886,16 @@ public class TaskManager {
 closeTaskDirty(task);
 }
 }
+}
 
-if (processingMode == EXACTLY_ONCE_V2) {
-activeTaskCreator.reInitializeThreadProducer();
+private void removeLostTasksFromStateUpdater() {
+if (stateUpdater != null) {
+for (final Task restoringTask : stateUpdater.getTasks()) {
+if (restoringTask.isActive()) {
+tasks.addPendingTaskToCloseDirty(restoringTask.id());
+stateUpdater.remove(restoringTask.id());
+}
+}
 }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 9628b42d92..8178fe3691 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -55,7 +55,9 @@ class Tasks {
 private final Map> pendingTasksToRecycle = new 
HashMap<>();
 private final Map> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
 private final Set pendingTasksToInit = new HashSet<>();
-private final Set pendingTasksToClose = new HashSet<>();
+private final Set pendingTasksToCloseClean = new HashSet<>();
+
+private final Set pendingTasksToCloseDirty = new HashSet<>();
 
 // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 private final Map activeTasksPerPartition = new 
HashMap<>();
@@ -111,12 +113,19 @@ class Tasks {
 pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
 }
 

[kafka] branch trunk updated: KAFKA-10199: Handle task closure and recycling from state updater (#12466)

2022-08-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new dc72f6ec02 KAFKA-10199: Handle task closure and recycling from state 
updater (#12466)
dc72f6ec02 is described below

commit dc72f6ec02c7a7fbda083cd8a5a9f0081c7e58fd
Author: Guozhang Wang 
AuthorDate: Mon Aug 15 19:33:46 2022 -0700

KAFKA-10199: Handle task closure and recycling from state updater (#12466)

1. Within the tryCompleteRestore function of the thread, try to drain the 
removed tasks from state updater and handle accordingly: 1) for recycle, 2) for 
closure, 3) for update input partitions.
2. Catch up on some unit test coverage from previous PRs.
3. Some minor cleanups around exception handling.

Reviewers: Bruno Cadonna 
---
 .../processor/internals/ActiveTaskCreator.java |   1 +
 .../streams/processor/internals/StandbyTask.java   |   2 +-
 .../processor/internals/StandbyTaskCreator.java|   1 +
 .../streams/processor/internals/StreamTask.java|   2 +-
 .../streams/processor/internals/TaskManager.java   | 311 +
 .../kafka/streams/processor/internals/Tasks.java   |  90 +++---
 .../internals/ProcessorTopologyFactories.java  |   1 -
 .../processor/internals/StandbyTaskTest.java   |   5 +-
 .../processor/internals/StreamTaskTest.java|  12 +-
 .../processor/internals/TaskManagerTest.java   | 178 +++-
 .../streams/processor/internals/TasksTest.java |  64 -
 11 files changed, 471 insertions(+), 196 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 46455111db..d28d0d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -135,6 +135,7 @@ class ActiveTaskCreator {
 return threadProducer;
 }
 
+// TODO: convert to StreamTask when we remove TaskManager#StateMachineTask 
with mocks
 public Collection createTasks(final Consumer 
consumer,
 final Map> 
tasksToBeCreated) {
 final List createdTasks = new ArrayList<>();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index bb7aef1dcd..87f19c4b1f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -234,7 +234,7 @@ public class StandbyTask extends AbstractTask implements 
Task {
 closeTaskSensor.record();
 transitionTo(State.CLOSED);
 
-log.info("Closed and recycled state, and converted type to active");
+log.info("Closed and recycled state");
 }
 
 private void close(final boolean clean) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 2f48cdb67f..26a3a49af3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -67,6 +67,7 @@ class StandbyTaskCreator {
 );
 }
 
+// TODO: convert to StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 Collection createTasks(final Map> 
tasksToBeCreated) {
 final List createdTasks = new ArrayList<>();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f7bf8a5e74..8a30bcf6ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -568,7 +568,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 closeTaskSensor.record();
 transitionTo(State.CLOSED);
 
-log.info("Closed and recycled state, and converted type to standby");
+log.info("Closed and recycled state");
 }
 
 /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index cfd20d2299..03c36b0daf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/i

[kafka-site] branch asf-site updated: Update my entry to include PMC membership (#434)

2022-08-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new ac411462 Update my entry to include PMC membership (#434)
ac411462 is described below

commit ac411462d13c9f5cc150aa47888a1af7aabb59d3
Author: A. Sophie Blee-Goldman 
AuthorDate: Tue Aug 9 09:36:59 2022 -0700

Update my entry to include PMC membership (#434)

Reviewers: David Jacot , Guozhang Wang 

---
 committers.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/committers.html b/committers.html
index ce95457a..1855ed80 100644
--- a/committers.html
+++ b/committers.html
@@ -341,7 +341,7 @@
 
 
   Anna Sophie Blee-Goldman
-  Committer
+  Committer, and PMC member
   https://www.linkedin.com/in/ableegoldman/;>/in/ableegoldman
 
 



[kafka] branch trunk updated: HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (#12476)

2022-08-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5ceaa588ee HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit 
Test (#12476)
5ceaa588ee is described below

commit 5ceaa588ee224fe1a859e4212636aba22b22f540
Author: Guozhang Wang 
AuthorDate: Wed Aug 3 15:36:59 2022 -0700

HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (#12476)

While working on KAFKA-13877, I feel it's an overkill to introduce the 
whole test class as an integration test, since all we need is to just test the 
assignor itself which could be a unit test. Running this suite with 9+ 
instances takes long time and is still vulnerable to all kinds of timing based 
flakiness. A better choice is to reduce it as a unit test, similar to 
HighAvailabilityStreamsPartitionAssignorTest that just test the behavior of the 
assignor itself, rather than creating m [...]

Since we mock everything, there's no flakiness anymore. Plus we greatly 
reduced the test runtime (on my local machine, the old integration takes about 
35 secs to run the whole suite, while the new one take 20ms on average).

Reviewers: Divij Vaidya , Dalibor Plavcic
---
 .../integration/RackAwarenessIntegrationTest.java  | 433 
 .../RackAwarenessStreamsPartitionAssignorTest.java | 576 +
 .../processor/internals/StreamTaskTest.java|   4 +-
 .../internals/assignment/AssignmentTestUtils.java  |   3 +
 4 files changed, 581 insertions(+), 435 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
deleted file mode 100644
index 7c93b769f5..00
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.ThreadMetadata;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Repartitioned;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.Timeout;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
-import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-@Timeout(600)
-@Tag("integration")
-public class RackAwarenessIntegrationTest {
-private static final int NUM_BROKERS = 1;
-
-private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
-
-private static final String

[kafka] branch trunk updated: KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468)

2022-08-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3202459394 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest 
(#12468)
3202459394 is described below

commit 32024593947f8bb497f2f5d392e0d1d892a16ff3
Author: Guozhang Wang 
AuthorDate: Wed Aug 3 09:17:38 2022 -0700

KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468)

In the current test, we check for tag distribution immediately after 
everyone is on the running state, however due to the fact of the follow-up 
rebalances, "everyone is now in running state" does not mean that the cluster 
is now stable. In fact, a follow-up rebalance may occur, upon which the local 
thread metadata would return empty which would cause the distribution verifier 
to fail.

Reviewers: Divij Vaidya , Luke Chen 
---
 .../kafka/streams/processor/internals/StreamTask.java  |  2 +-
 .../streams/integration/RackAwarenessIntegrationTest.java  | 14 +++---
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 07c4494225..f7bf8a5e74 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1267,7 +1267,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 final SourceNode source = topology.source(partition.topic());
 if (source == null) {
 throw new TopologyException(
-"Topic is unknown to the topology. " +
+"Topic " + partition.topic() + " is unknown to the 
topology. " +
 "This may happen if different KafkaStreams 
instances of the same application execute different Topologies. " +
 "Note that Topologies are only identical if 
all operators are added in the same order."
 );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index 677633e9b0..7c93b769f5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -55,8 +55,8 @@ import java.util.stream.Stream;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(600)
 @Tag("integration")
@@ -143,13 +143,13 @@ public class RackAwarenessIntegrationTest {
 createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas);
 
 waitUntilAllKafkaStreamsClientsAreRunning();
-assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
+waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly 
distributed");
 
 stopKafkaStreamsInstanceWithIndex(0);
 
 waitUntilAllKafkaStreamsClientsAreRunning();
 
-assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
+waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly 
distributed");
 }
 
 @Test
@@ -165,7 +165,7 @@ public class RackAwarenessIntegrationTest {
 createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, 
TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), 
numberOfStandbyReplicas);
 
 waitUntilAllKafkaStreamsClientsAreRunning();
-
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
+waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags 
are evenly distributed");
 }
 
 @Test
@@ -186,7 +186,7 @@ public class RackAwarenessIntegrationTest {
 createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, 
TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), 
numberOfStandbyReplicas);
 
 waitUntilAllKafkaStreamsClientsAreRunning();
-assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, 
TAG_CLUSTER)));
+waitForCondition(() -> 
isIdealTaskDistributionReachedForTa

[kafka] branch trunk updated: Minor: enable index for emit final sliding window (#12461)

2022-07-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f7ac5d3d00 Minor: enable index for emit final sliding window (#12461)
f7ac5d3d00 is described below

commit f7ac5d3d00f3cd3caa25c3003900bdb245d5252e
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Fri Jul 29 14:47:25 2022 -0700

Minor: enable index for emit final sliding window (#12461)

Enable index for sliding window emit final case as it's faster to fetch 
windows for particular key

Reviewers: Guozhang Wang 
---
 .../kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 5ca6b911b7..587d2d5a87 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -233,7 +233,7 @@ public class SlidingWindowedKStreamImpl extends 
AbstractStream imple
 Duration.ofMillis(retentionPeriod),
 Duration.ofMillis(windows.timeDifferenceMs()),
 false,
-false
+true
 ) :
 Stores.persistentTimestampedWindowStore(
 materialized.storeName(),



[kafka] branch trunk updated: MINOR: Fix static mock usage in ThreadMetricsTest (#12454)

2022-07-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new a5d71e1550 MINOR: Fix static mock usage in ThreadMetricsTest (#12454)
a5d71e1550 is described below

commit a5d71e1550c5a2b670347c0c3bafb0b195bf916c
Author: Bruno Cadonna 
AuthorDate: Thu Jul 28 22:32:46 2022 +0200

MINOR: Fix static mock usage in ThreadMetricsTest (#12454)

Before this PR the calls to the static methods on StreamsMetricsImpl were 
just calls and not a verification on the mock. This miss happened during the 
switch from EasyMock to Mockito.

Reviewers: Guozhang Wang 
---
 .../internals/metrics/ThreadMetricsTest.java   | 422 +++--
 1 file changed, 224 insertions(+), 198 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index 6ed97ebf7c..3d2aaa20c8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -24,12 +24,14 @@ import org.junit.Test;
 
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
 import java.util.Map;
 import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
@@ -54,17 +56,20 @@ public class ThreadMetricsTest {
 final String ratioDescription = "The fraction of time the thread spent 
on processing active tasks";
 when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, 
RecordingLevel.INFO)).thenReturn(expectedSensor);
 when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
-StreamsMetricsImpl.addValueMetricToSensor(
-expectedSensor,
-THREAD_LEVEL_GROUP,
-tagMap,
-operation,
-ratioDescription
-);
-
-final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, 
streamsMetrics);
 
-assertThat(sensor, is(expectedSensor));
+try (final MockedStatic streamsMetricsStaticMock = 
mockStatic(StreamsMetricsImpl.class)) {
+final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, 
streamsMetrics);
+streamsMetricsStaticMock.verify(
+() -> StreamsMetricsImpl.addValueMetricToSensor(
+expectedSensor,
+THREAD_LEVEL_GROUP,
+tagMap,
+operation,
+ratioDescription
+)
+);
+assertThat(sensor, is(expectedSensor));
+}
 }
 
 @Test
@@ -74,18 +79,21 @@ public class ThreadMetricsTest {
 final String maxDescription = "The maximum number of records processed 
within an iteration";
 when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, 
RecordingLevel.INFO)).thenReturn(expectedSensor);
 when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
-StreamsMetricsImpl.addAvgAndMaxToSensor(
-expectedSensor,
-THREAD_LEVEL_GROUP,
-tagMap,
-operation,
-avgDescription,
-maxDescription
-);
 
-final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, 
streamsMetrics);
-
-assertThat(sensor, is(expectedSensor));
+try (final MockedStatic streamsMetricsStaticMock = 
mockStatic(StreamsMetricsImpl.class)) {
+final Sensor sensor = 
ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics);
+streamsMetricsStaticMock.verify(
+() -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+expectedSensor,
+THREAD_LEVEL_GROUP,
+tagMap,
+operation,
+avgDescription,
+maxDescription
+)
+);
+assertThat(sensor, is(expectedSensor));
+}
 }
 
 @Test
@@ -95,18 +103,21 @@ public class ThreadMetricsTest {
 final String maxLatencyDescription = "The maximum process latency";
 when(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, 
RecordingLevel.INFO)).thenReturn(expectedSensor);
 when(streamsMetrics.threadLevel

[kafka] branch trunk updated (9e74f91e56 -> 2724cc9920)

2022-07-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 9e74f91e56 KAFKA-14089: Only check for committed seqnos after 
disabling exactly-once support in Connect integration test (#12429)
 add 2724cc9920 KAFKA-10199: Bookkeep tasks during assignment for use with 
state updater (#12442)

No new revisions were added by this update.

Summary of changes:
 .../streams/processor/internals/StreamThread.java  |  17 ++-
 .../streams/processor/internals/TaskManager.java   | 155 ++---
 .../kafka/streams/processor/internals/Tasks.java   |  21 +++
 .../processor/internals/StreamThreadTest.java  |   4 +-
 .../processor/internals/TaskManagerTest.java   |  92 
 .../org/apache/kafka/test/StreamsTestUtils.java|  23 ---
 6 files changed, 212 insertions(+), 100 deletions(-)



[kafka] branch trunk updated: KAFKA-10199: Further refactor task lifecycle management (#12439)

2022-07-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 06f47c3b51 KAFKA-10199: Further refactor task lifecycle management 
(#12439)
06f47c3b51 is described below

commit 06f47c3b517f5c96c1bd981369843f3483947197
Author: Guozhang Wang 
AuthorDate: Wed Jul 27 17:29:05 2022 -0700

KAFKA-10199: Further refactor task lifecycle management (#12439)

1. Consolidate the task recycle procedure into a single function within the 
task. The current procedure now becomes: a) task.recycleStateAndConvert, at end 
of it the task is in closed while its stateManager is retained, and the manager 
type has been converted; 2) create the new task with old task's fields and the 
stateManager inside the creators.
2. Move the task execution related metadata into the corresponding 
TaskExecutionMetadata class, including the task idle related metadata (e.g. 
successfully processed tasks); reduce the number of params needed for 
TaskExecutor as well as Tasks.
3. Move the task execution related fields (embedded producer and consumer) 
and task creators out of Tasks and migrated into TaskManager. Now the Tasks is 
only a bookkeeping place without any task mutation logic.
4. When adding tests, I realized that we should not add task to state 
updater right after creation, since it was not initialized yet, while state 
updater would validate that the task's state is already restoring / running. So 
I updated that logic while adding unit tests.

Reviewers: Bruno Cadonna 
---
 .../streams/processor/internals/AbstractTask.java  |   5 +-
 .../processor/internals/ActiveTaskCreator.java |  36 ++-
 .../streams/processor/internals/StandbyTask.java   |  51 +---
 .../processor/internals/StandbyTaskCreator.java|  32 ++-
 .../streams/processor/internals/StreamTask.java|  52 +---
 .../kafka/streams/processor/internals/Task.java|   5 +-
 .../processor/internals/TaskExecutionMetadata.java |  36 ++-
 .../streams/processor/internals/TaskExecutor.java  |  49 ++--
 .../streams/processor/internals/TaskManager.java   | 188 -
 .../kafka/streams/processor/internals/Tasks.java   | 203 --
 .../processor/internals/TopologyMetadata.java  |   4 +-
 .../processor/internals/StandbyTaskTest.java   |   6 +-
 .../processor/internals/StreamTaskTest.java|  12 +-
 .../processor/internals/StreamThreadTest.java  |   1 +
 .../internals/TaskExecutionMetadataTest.java   |  11 +-
 .../processor/internals/TaskExecutorTest.java  |   5 +-
 .../processor/internals/TaskManagerTest.java   | 293 +
 .../streams/processor/internals/TasksTest.java |  76 +-
 18 files changed, 512 insertions(+), 553 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8ffbf9cd2e..a88d89fc33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -43,10 +43,11 @@ public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
 private long deadlineMs = NO_DEADLINE;
 
-protected Set inputPartitions;
 protected final Logger log;
-protected final LogContext logContext;
 protected final String logPrefix;
+protected final LogContext logContext;
+
+protected Set inputPartitions;
 
 /**
  * If the checkpoint has not been loaded from the file yet (null), then we 
should not overwrite the checkpoint;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index ef21a51c99..46455111db 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -135,10 +135,8 @@ class ActiveTaskCreator {
 return threadProducer;
 }
 
-// TODO: change return type to `StreamTask`
 public Collection createTasks(final Consumer 
consumer,
- final Map> 
tasksToBeCreated) {
-// TODO: change type to `StreamTask`
+final Map> 
tasksToBeCreated) {
 final List createdTasks = new ArrayList<>();
 
 for (final Map.Entry> newTaskAndPartitions 
: tasksToBeCreated.entrySet()) {
@@ -211,13 +209,39 @@ class ActiveTaskCreator {
 );
 }
 
+/*
+ * TODO: we pass in the new input partitions to validate if they still 
match,
+ *   in the future we whe

[kafka] branch trunk updated (5e4ae06d12 -> 5a52601691)

2022-07-21 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
 add 5a52601691 KAFKA-10199: Add tasks to state updater when they are 
created (#12427)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/StreamsConfig.java|   3 +
 .../streams/processor/internals/AbstractTask.java  |   2 +-
 .../processor/internals/ProcessorStateManager.java |   3 +-
 .../streams/processor/internals/StreamThread.java  |   4 +-
 .../kafka/streams/processor/internals/Task.java|   2 +-
 .../streams/processor/internals/TaskManager.java   |  18 +-
 .../kafka/streams/processor/internals/Tasks.java   |  25 ++-
 .../internals/DefaultStateUpdaterTest.java | 223 ++---
 .../processor/internals/StreamThreadTest.java  |   6 +-
 .../processor/internals/TaskManagerTest.java   |  27 +--
 .../streams/processor/internals/TasksTest.java | 133 
 .../org/apache/kafka/test/StreamsTestUtils.java|  74 +++
 12 files changed, 381 insertions(+), 139 deletions(-)
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java



[kafka] branch trunk updated: MINOR: fix flaky test test_standby_tasks_rebalance (#12428)

2022-07-21 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
5e4ae06d12 is described below

commit 5e4ae06d129f01d94ed27f73ea311a2adb0477e7
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Thu Jul 21 12:12:29 2022 -0700

MINOR: fix flaky test test_standby_tasks_rebalance (#12428)

* Description
In this test, when third proc join, sometimes there are other rebalance 
scenarios such as followup joingroup request happens before syncgroup response 
was received by one of the proc and the previously assigned tasks for that proc 
is then lost during new joingroup request. This can result in standby tasks 
assigned as 3, 1, 2. This PR relax the expected assignment of 2, 2, 2 to a 
range of [1-3].

* Some backgroud from Guozhang:
I talked to @hao Li offline and also inspected the code a bit, and tl;dr is 
that I think the code logic is correct (i.e. we do not really have a bug), but 
we need to relax the test verification a little bit. The general idea behind 
the subscription info is that:

When a client joins the group, its subscription will try to encode all its 
current assigned active and standby tasks, which would be used as prev active 
and standby tasks by the assignor in order to achieve some stickiness.

When a client drops all its active/standby tasks due to errors, it does not 
actually report all empty from its subscription, instead it tries to check its 
local state directory (you can see that from TaskManager#getTaskOffsetSums 
which populates the taskOffsetSum. For active task, its offset would be “-2” 
a.k.a. LATEST_OFFSET, for standby task, its offset is an actual numerical 
number.

So in this case, the proc2 which drops all its active and standby tasks, 
would still report all tasks that have some local state still, and since it was 
previously owning all six tasks (three as active, and three as standby), it 
would report all six as standbys, and when that happens the resulted assignment 
as @hao Li verified, is indeed the un-even one.

So I think the actual “issue“ happens here, is when proc2 is a bit late 
sending the sync-group request, when the previous rebalance has already 
completed, and a follow-up rebalance has already triggered, in that case, the 
resulted un-even assignment is indeed expected. Such a scenario, though not 
common, is still legitimate since in practice all kinds of timing skewness 
across instances can happen. So I think we should just relax our verification 
here, i.e. just making sure that each  [...]

Reviewers: Suhas Satish , Guozhang Wang 

---
 .../tests/streams/streams_standby_replica_test.py | 19 +--
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py 
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index a8c07513c1..c0e5953f73 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -73,9 +73,9 @@ class StreamsStandbyTask(BaseStreamsTest):
 
 processor_3.start()
 
-self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_2.STDOUT_FILE)
-self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
 
 processor_1.stop()
 
@@ -93,9 +93,9 @@ class StreamsStandbyTask(BaseStreamsTest):
 
 processor_2.start()
 
-self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_2.STDOUT_FILE)
-self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2)
+self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", proce

[kafka] branch trunk updated: KAFKA-10199: Add RESUME in state updater (#12387)

2022-07-19 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387)
693e283802 is described below

commit 693e283802590b724ef441d5bf7acb6eeced91c5
Author: Guozhang Wang 
AuthorDate: Tue Jul 19 09:44:10 2022 -0700

KAFKA-10199: Add RESUME in state updater (#12387)

* Need to check enforceRestoreActive / transitToUpdateStandby when resuming 
a paused task.
* Do not expose another getResumedTasks since I think its caller only need 
the getPausedTasks.

Reviewers: Bruno Cadonna 
---
 .../processor/internals/DefaultStateUpdater.java   |  36 -
 .../streams/processor/internals/StateUpdater.java  |  13 ++
 .../streams/processor/internals/TaskAndAction.java |  10 +-
 .../internals/DefaultStateUpdaterTest.java | 158 -
 .../processor/internals/TaskAndActionTest.java |  20 +++
 5 files changed, 229 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 08959bee00..7e7ec2a6f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater {
 case PAUSE:
 pauseTask(taskAndAction.getTaskId());
 break;
+case RESUME:
+resumeTask(taskAndAction.getTaskId());
+break;
 }
 }
 } finally {
@@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater {
 final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
 if (existingTask != null) {
 throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
-"should not try to add another " + (task.isActive() ? 
"Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+"should not try to add another " + (task.isActive() ? 
"active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
 }
 
 if (task.isActive()) {
@@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 }
 
+private void resumeTask(final TaskId taskId) {
+final Task task = pausedTasks.get(taskId);
+if (task != null) {
+updatingTasks.put(taskId, task);
+pausedTasks.remove(taskId);
+
+if (task.isActive()) {
+log.debug("Stateful active task " + task.id() + " was 
resumed to the updating tasks of the state updater");
+changelogReader.enforceRestoreActive();
+} else {
+log.debug("Standby task " + task.id() + " was resumed to 
the updating tasks of the state updater");
+if (updatingTasks.size() == 1) {
+changelogReader.transitToUpdateStandby();
+}
+}
+} else {
+log.debug("Task " + taskId + " was not resumed since it is not 
paused.");
+}
+}
+
 private boolean isStateless(final Task task) {
 return task.changelogPartitions().isEmpty() && task.isActive();
 }
@@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 }
 
+@Override
+public void resume(final TaskId taskId) {
+tasksAndActionsLock.lock();
+try {
+tasksAndActions.add(TaskAndAction.createResumeTask(taskId));
+tasksAndActionsCondition.signalAll();
+} finally {
+tasksAndActionsLock.unlock();
+}
+}
+
 @Override
 public Set drainRestoredActiveTasks(final Duration timeout) {
 final long timeoutMs = timeout.toMillis();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 516e47436b..69d521b600 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ 
b/streams/src/main/java/

[kafka] branch trunk updated: KAFKA-10199: Add PAUSE in state updater (#12386)

2022-07-18 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386)
309e0f986e is described below

commit 309e0f986e97be966c797f7729eb1e94ef5400a9
Author: Guozhang Wang 
AuthorDate: Mon Jul 18 16:42:48 2022 -0700

KAFKA-10199: Add PAUSE in state updater (#12386)

* Add pause action to task-updater.
* When removing a task, also check in the paused tasks in addition to 
removed tasks.
* Also I realized we do not check if tasks with the same id are added, so I 
add that check in this PR as well.

Reviewers: Bruno Cadonna 
---
 .../processor/internals/DefaultStateUpdater.java   |  62 -
 .../streams/processor/internals/StateUpdater.java  |  13 +
 .../processor/internals/StoreChangelogReader.java  |   2 +-
 .../streams/processor/internals/TaskAndAction.java |  10 +-
 .../internals/DefaultStateUpdaterTest.java | 282 -
 .../processor/internals/TaskAndActionTest.java |  20 ++
 6 files changed, 379 insertions(+), 10 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 22fd48a4ab..08959bee00 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -83,7 +83,7 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 
 public boolean onlyStandbyTasksLeft() {
-return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
+return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);
 }
 
 @Override
@@ -125,6 +125,9 @@ public class DefaultStateUpdater implements StateUpdater {
 case REMOVE:
 removeTask(taskAndAction.getTaskId());
 break;
+case PAUSE:
+pauseTask(taskAndAction.getTaskId());
+break;
 }
 }
 } finally {
@@ -243,7 +246,12 @@ public class DefaultStateUpdater implements StateUpdater {
 addToRestoredTasks((StreamTask) task);
 log.debug("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
 } else {
-updatingTasks.put(task.id(), task);
+final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
+if (existingTask != null) {
+throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
+"should not try to add another " + (task.isActive() ? 
"Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+}
+
 if (task.isActive()) {
 log.debug("Stateful active task " + task.id() + " was 
added to the updating tasks of the state updater");
 changelogReader.enforceRestoreActive();
@@ -257,8 +265,9 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 
 private void removeTask(final TaskId taskId) {
-final Task task = updatingTasks.get(taskId);
-if (task != null) {
+final Task task;
+if (updatingTasks.containsKey(taskId)) {
+task = updatingTasks.get(taskId);
 task.maybeCheckpoint(true);
 final Collection changelogPartitions = 
task.changelogPartitions();
 changelogReader.unregister(changelogPartitions);
@@ -267,8 +276,31 @@ public class DefaultStateUpdater implements StateUpdater {
 transitToUpdateStandbysIfOnlyStandbysLeft();
 log.debug((task.isActive() ? "Active" : "Standby")
 + " task " + task.id() + " was removed from the updating 
tasks and added to the removed tasks.");
+} else if (pausedTasks.containsKey(taskId)) {
+task = pausedTasks.get(taskId);
+final Collection changelogPartitions = 
task.changelogPartitions();
+changelogReader.unregister(changelogPartitions);
+removedTasks.add(task);
+pausedTasks.remove(taskId);
+log.debug((task.isActive() ? "Act

[kafka] branch trunk updated: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370)

2022-07-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b5d4fa7645e KAFKA-13785: [10/N][emit final] more unit test for session 
store and disable cache for emit final sliding window (#12370)
b5d4fa7645e is described below

commit b5d4fa7645eb75d2030eb8cac78545a681686a39
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Tue Jul 12 10:57:11 2022 -0700

KAFKA-13785: [10/N][emit final] more unit test for session store and 
disable cache for emit final sliding window (#12370)

1. Added more unit test for RocksDBTimeOrderedSessionStore and 
RocksDBTimeOrderedSessionSegmentedBytesStore
2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE

Reviewers: Matthias J. Sax , Guozhang Wang 

---
 .../internals/SessionWindowedKStreamImpl.java  |   3 +
 .../internals/SlidingWindowedKStreamImpl.java  |   4 +-
 .../state/internals/PrefixedSessionKeySchemas.java |  14 +--
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |   8 +-
 .../internals/RocksDBTimeOrderedSessionStore.java  |   8 +-
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 138 +
 .../internals/AbstractSessionBytesStoreTest.java   | 124 ++
 .../state/internals/InMemorySessionStoreTest.java  |  41 +-
 .../state/internals/RocksDBSessionStoreTest.java   |  57 +
 9 files changed, 291 insertions(+), 106 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index c3b05cb1182..8c60019fccb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -289,7 +289,10 @@ public class SessionWindowedKStreamImpl extends 
AbstractStream imple
 // do not enable cache if the emit final strategy is used
 if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
 builder.withCachingEnabled();
+} else {
+builder.withCachingDisabled();
 }
+
 return builder;
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 70c75b4c82e..5ca6b911b7c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -258,7 +258,9 @@ public class SlidingWindowedKStreamImpl extends 
AbstractStream imple
 } else {
 builder.withLoggingDisabled();
 }
-if (materialized.cachingEnabled()) {
+
+// do not enable cache if the emit final strategy is used
+if (materialized.cachingEnabled() && emitStrategy.type() != 
StrategyType.ON_WINDOW_CLOSE) {
 builder.withCachingEnabled();
 } else {
 builder.withCachingDisabled();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index 2ac25277ba8..3ce00bcb8a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -102,8 +102,8 @@ public class PrefixedSessionKeySchemas {
 @Override
 public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
  final Bytes binaryKeyTo,
- final long from,
- final long to,
+ final long 
earliestWindowEndTime,
+ final long 
latestWindowStartTime,
  final boolean forward) {
 return iterator -> {
 while (iterator.hasNext()) {
@@ -120,13 +120,13 @@ public class PrefixedSessionKeySchemas {
 
 // We can return false directly here since keys are sorted 
by end time and if
 // we get time smaller than `from`, there won't be time 
within range.
-if (!forward && endTime < from) {
+if (!forward && endTime < earliestWindowE

[kafka] branch trunk updated (38b08dfd338 -> 5a1bac2608c)

2022-07-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 38b08dfd338 MINOR: revert KIP-770 (#12383)
 add 5a1bac2608c KAFKA-13846: Follow up PR to address review comments 
(#12297)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/kafka/common/metrics/Metrics.java | 12 +++-
 docs/upgrade.html | 15 ---
 2 files changed, 19 insertions(+), 8 deletions(-)



[kafka] branch trunk updated: HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions

2022-07-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ca8135b242a HOTFIX: KIP-851, rename requireStable in 
ListConsumerGroupOffsetsOptions
ca8135b242a is described below

commit ca8135b242a5d5fd71e5edffbcb85ada2f5d9bd2
Author: Guozhang Wang 
AuthorDate: Wed Jul 6 22:00:31 2022 -0700

HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions
---
 .../src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java  | 2 +-
 .../org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 99fd98b443f..aad2610c94a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3404,7 +3404,7 @@ public class KafkaAdminClient extends AdminClient {
final 
ListConsumerGroupOffsetsOptions options) {
 SimpleAdminApiFuture> future =
 ListConsumerGroupOffsetsHandler.newFuture(groupId);
-ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), 
options.shouldRequireStable(), logContext);
+ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), 
options.requireStable(), logContext);
 invokeDriver(handler, future, options.timeoutMs);
 return new 
ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index a06feaf3b38..292a47ef393 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -60,7 +60,7 @@ public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions

[kafka] branch trunk updated (6495a0768ca -> 915c7812439)

2022-07-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset 
(#12360)
 add 915c7812439 KAFKA-10199: Remove main consumer from store changelog 
reader (#12337)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/admin/KafkaAdminClient.java  |  2 +-
 .../admin/ListConsumerGroupOffsetsOptions.java | 13 
 .../internals/ListConsumerGroupOffsetsHandler.java | 15 -
 .../kafka/clients/admin/AdminClientTestUtils.java  |  7 +++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 33 ++
 .../kafka/clients/admin/MockAdminClient.java   | 38 ++--
 .../processor/internals/DefaultStateUpdater.java   |  3 +-
 .../processor/internals/StoreChangelogReader.java  | 71 --
 .../streams/processor/internals/StreamThread.java  |  1 -
 .../internals/DefaultStateUpdaterTest.java |  1 +
 .../processor/internals/StandbyTaskTest.java   |  8 ++-
 .../internals/StoreChangelogReaderTest.java| 50 ++-
 .../processor/internals/StreamTaskTest.java| 27 +++-
 13 files changed, 188 insertions(+), 81 deletions(-)



[kafka] branch trunk updated: HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363)

2022-07-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ae570f5953 HOTFIX: Correct ordering of input buffer and enforced 
processing sensors (#12363)
ae570f5953 is described below

commit ae570f59533ae941bbf5ab9ff5e739a5bd855fd6
Author: Guozhang Wang 
AuthorDate: Sun Jul 3 10:02:59 2022 -0700

HOTFIX: Correct ordering of input buffer and enforced processing sensors 
(#12363)

1. As titled, fix the right constructor param ordering.
2. Also added a few more loglines.

Reviewers: Matthias J. Sax , Sagar Rao 
, Hao Li <1127478+lihao...@users.noreply.github.com>
---
 .../src/main/java/org/apache/kafka/streams/KafkaStreams.java  |  7 ++-
 .../kafka/streams/processor/internals/PartitionGroup.java | 11 ++-
 .../apache/kafka/streams/processor/internals/StreamTask.java  |  4 +---
 .../kafka/streams/processor/internals/StreamThread.java   | 11 ---
 .../kafka/streams/processor/internals/PartitionGroupTest.java |  2 +-
 5 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 86ab83f67d..2c95aa85a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -978,6 +978,10 @@ public class KafkaStreams implements AutoCloseable {
 }
 // Initially, all Stream Threads are created with 0 cache size and max 
buffer size and then resized here.
 resizeThreadCacheAndBufferMemory(numStreamThreads);
+if (numStreamThreads > 0) {
+log.info("Initializing {} StreamThread with cache size/max buffer 
size values as {} per thread.",
+numStreamThreads, getThreadCacheAndBufferMemoryString());
+}
 
 stateDirCleaner = setupStateDirCleaner();
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
@@ -1143,7 +1147,8 @@ public class KafkaStreams implements AutoCloseable {
 + "for it to complete shutdown as this 
will result in deadlock.", streamThread.getName());
 }
 
resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
-log.info("Resizing thread cache/max buffer size due to 
removal of thread {}, new cache size/max buffer size per thread is {}", 
streamThread.getName(), getThreadCacheAndBufferMemoryString());
+log.info("Resizing thread cache/max buffer size due to 
removal of thread {}, " +
+"new cache size/max buffer size per thread is {}", 
streamThread.getName(), getThreadCacheAndBufferMemoryString());
 if (groupInstanceID.isPresent() && 
callingThreadIsNotCurrentStreamThread) {
 final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceID.get());
 final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 21d3cbfa3f..750699a1ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -64,7 +64,7 @@ public class PartitionGroup {
 private final Sensor enforcedProcessingSensor;
 private final long maxTaskIdleMs;
 private final Sensor recordLatenessSensor;
-private final Sensor totalBytesSensor;
+private final Sensor totalInputBufferBytesSensor;
 private final PriorityQueue nonEmptyQueuesByTime;
 
 private long streamTime;
@@ -93,8 +93,8 @@ public class PartitionGroup {
final Map partitionQueues,
final Function lagProvider,
final Sensor recordLatenessSensor,
+   final Sensor totalInputBufferBytesSensor,
final Sensor enforcedProcessingSensor,
-   final Sensor totalBytesSensor,
final long maxTaskIdleMs) {
 this.logger = logContext.logger(PartitionGroup.class);
 nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
@@ -103,7 +103,7 @@ public class PartitionGroup {
 this.enforcedProcessingSensor = enforcedProcessingSensor;
 this.maxTaskIdleMs = maxTaskIdleMs;
 this.recordLatenessSensor 

[kafka-site] branch asf-site updated: Fix docs in quickstart.html (#419)

2022-07-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 60d9b3aa Fix docs in quickstart.html (#419)
60d9b3aa is described below

commit 60d9b3aaa444aedf6646e484f587f8a170781d33
Author: JK-Wang <32212764+jk-w...@users.noreply.github.com>
AuthorDate: Mon Jul 4 00:12:01 2022 +0800

Fix docs in quickstart.html (#419)

Reviewers: Guozhang Wang 
---
 32/quickstart.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/32/quickstart.html b/32/quickstart.html
index 79a33f8f..0d9fca9b 100644
--- a/32/quickstart.html
+++ b/32/quickstart.html
@@ -183,7 +183,7 @@ This is my second event
 
 
 
- echo "plugin.path=lib/connect-file-{{fullDotVersion}}.jar"
+ echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar"
 
 
 Then, start by creating some seed data to test with:



[kafka] branch trunk updated (bad475166f -> a82a8e02ce)

2022-07-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from bad475166f MINOR: Add indent space after hyperlink in 
`docs/upgrade.html` (#12353)
 add a82a8e02ce MINOR: Fix static mock usage in TaskMetricsTest (#12373)

No new revisions were added by this update.

Summary of changes:
 .../internals/metrics/TaskMetricsTest.java | 266 -
 1 file changed, 149 insertions(+), 117 deletions(-)



[kafka] branch trunk updated: MINOR: Use mock time in DefaultStateUpdaterTest (#12344)

2022-06-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3faa6cf6d0 MINOR: Use mock time in DefaultStateUpdaterTest (#12344)
3faa6cf6d0 is described below

commit 3faa6cf6d060887288fcf68adb8c3f1e2090b8ed
Author: Guozhang Wang 
AuthorDate: Wed Jun 29 12:33:00 2022 -0700

MINOR: Use mock time in DefaultStateUpdaterTest (#12344)

For most tests we would need an auto-ticking mock timer to work with 
draining-with-timeout functions.
For tests that check for never checkpoint we need no auto-ticking timer to 
control exactly how much time elapsed.

Reviewers: Bruno Cadonna 
---
 .../processor/internals/DefaultStateUpdater.java   |  5 ++--
 .../internals/DefaultStateUpdaterTest.java | 34 --
 2 files changed, 22 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 0e84574c5c..886a37b314 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -356,8 +356,6 @@ public class DefaultStateUpdater implements StateUpdater {
 this.offsetResetter = offsetResetter;
 this.time = time;
 this.commitIntervalMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
-// initialize the last commit as of now to prevent first commit 
happens immediately
-this.lastCommitMs = time.milliseconds();
 }
 
 public void start() {
@@ -365,6 +363,9 @@ public class DefaultStateUpdater implements StateUpdater {
 stateUpdaterThread = new StateUpdaterThread("state-updater", 
changelogReader, offsetResetter);
 stateUpdaterThread.start();
 shutdownGate = new CountDownLatch(1);
+
+// initialize the last commit as of now to prevent first commit 
happens immediately
+this.lastCommitMs = time.milliseconds();
 }
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 8bd81828f6..5e2d90de71 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -44,7 +45,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.easymock.EasyMock.anyBoolean;
@@ -79,24 +79,25 @@ class DefaultStateUpdaterTest {
 private final static TaskId TASK_1_0 = new TaskId(1, 0);
 private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
-private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
+// need an auto-tick timer to work for draining with timeout
+private final Time time = new MockTime(1L);
+private final StreamsConfig config = new StreamsConfig(configProps());
 private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
 private final java.util.function.Consumer> 
offsetResetter = topicPartitions -> { };
-
-private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, 
changelogReader, offsetResetter, Time.SYSTEM);
+private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, time);
 
 @AfterEach
 public void tearDown() {
 stateUpdater.shutdown(Duration.ofMinutes(1));
 }
 
-private Properties configProps(final int commitInterval) {
+private Properties configProps() {
 return mkObjectProperties(mkMap(
 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2

[kafka] branch trunk updated: [9/N][Emit final] Emit final for session window aggregations (#12204)

2022-06-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ababc4261b [9/N][Emit final] Emit final for session window 
aggregations (#12204)
ababc4261b is described below

commit ababc4261bfa03ee9d29ae7254ddd0ba988f826d
Author: Guozhang Wang 
AuthorDate: Wed Jun 29 09:22:37 2022 -0700

[9/N][Emit final] Emit final for session window aggregations (#12204)

* Add a new API for session windows to range query session window by end 
time (KIP related).
* Augment session window aggregator with emit strategy.
* Minor: consolidated some dup classes.
* Test: unit test on session window aggregator.

Reviewers: Guozhang Wang 
---
 .../streams/kstream/SessionWindowedKStream.java|   5 +-
 .../kafka/streams/kstream/TimeWindowedKStream.java |   1 +
 ...bstractKStreamTimeWindowAggregateProcessor.java |  11 +-
 .../internals/CogroupedStreamAggregateBuilder.java |   2 +
 .../internals/KStreamSessionWindowAggregate.java   | 272 -
 .../kstream/internals/KStreamWindowAggregate.java  |   7 -
 .../kstream/internals/SessionTupleForwarder.java   |  56 -
 .../internals/SessionWindowedKStreamImpl.java  |  29 ++-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  55 +++--
 .../internals/TimestampedTupleForwarder.java   |   3 +-
 .../internals/AbstractReadWriteDecorator.java  |   6 +
 .../apache/kafka/streams/state/SessionStore.java   |  13 +
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java |   6 +-
 .../internals/ChangeLoggingSessionBytesStore.java  |  12 +-
 .../state/internals/InMemorySessionStore.java  |  21 +-
 .../state/internals/MeteredSessionStore.java   |  12 +
 .../state/internals/PrefixedSessionKeySchemas.java |  13 +-
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |  33 ++-
 .../internals/RocksDBTimeOrderedSessionStore.java  |   7 +
 .../streams/state/internals/SegmentIterator.java   |   2 +-
 .../state/internals/SegmentedBytesStore.java   |   4 +-
 .../streams/state/internals/SessionKeySchema.java  |   2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 219 -
 .../internals/KStreamWindowAggregateTest.java  |   2 +-
 .../internals/SessionTupleForwarderTest.java   | 108 
 .../internals/SessionWindowedKStreamImplTest.java  | 171 +
 .../internals/TimeWindowedKStreamImplTest.java |   2 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 28 files changed, 676 insertions(+), 406 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index c561b62abf..fe897515a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -39,7 +39,7 @@ import java.time.Duration;
  * materialized view) that can be queried using the name provided in the 
{@link Materialized} instance.
  * Furthermore, updates to the store are sent downstream into a windowed 
{@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
- * New events are added to sessions until their grace period ends (see {@link 
SessionWindows#grace(Duration)}).
+ * New events are added to sessions until their grace period ends (see {@link 
SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}).
  * 
  * A {@code SessionWindowedKStream} must be obtained from a {@link 
KGroupedStream} via
  * {@link KGroupedStream#windowedBy(SessionWindows)}.
@@ -643,4 +643,7 @@ public interface SessionWindowedKStream {
 KTable, V> reduce(final Reducer reducer,
   final Named named,
   final Materialized> materialized);
+
+// TODO: add javadoc
+SessionWindowedKStream emitStrategy(final EmitStrategy emitStrategy);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 46ebd267f9..3f36838f20 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -649,5 +649,6 @@ public interface TimeWindowedKStream {
   final Named named,
   final Materialized> materialized);
 
+// TODO: add javadoc
 TimeWindowedKStream emitStrategy(final EmitStrategy emitStrategy);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregat

[kafka-site] branch asf-site updated: Add atguigu(http://www.atguigu.com/)to the list pf the "Powered By ❤" (#418)

2022-06-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 375a2ec4 Add atguigu(http://www.atguigu.com/)to the list pf the 
"Powered By ❤" (#418)
375a2ec4 is described below

commit 375a2ec493b1b1e0f5fbf3e275c90f0b09bc982b
Author: realdengziqi <42276568+realdengz...@users.noreply.github.com>
AuthorDate: Tue Jun 28 11:28:03 2022 +0800

Add atguigu(http://www.atguigu.com/)to the list pf the "Powered By ❤" (#418)
    
Reviewers: Guozhang Wang 
---
 images/powered-by/atguigu.png | Bin 0 -> 39711 bytes
 powered-by.html   |   5 +
 2 files changed, 5 insertions(+)

diff --git a/images/powered-by/atguigu.png b/images/powered-by/atguigu.png
new file mode 100644
index ..4e72d8b5
Binary files /dev/null and b/images/powered-by/atguigu.png differ
diff --git a/powered-by.html b/powered-by.html
index ce172902..c5922bca 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -694,6 +694,11 @@
 "logo": "atruvia_logo_online_rgb.png",
 "logoBgColor": "#d4f2f5",
 "description": "At Atruvia we use Apache Kafka to share events within 
the modern banking platform."
+}, {
+"link": "http://www.atguigu.com/;,
+"logo": "atguigu.png",
+"logoBgColor": "#ff",
+"description": "In our real-time data warehouse, apache kafka is used 
as a reliable distributed message queue, which allows us to build a highly 
available analysis system."
 }];
 
 



[kafka] branch trunk updated: KAFKA-10199: Expose tasks in state updater (#12312)

2022-06-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 1ceaf30039 KAFKA-10199: Expose tasks in state updater (#12312)
1ceaf30039 is described below

commit 1ceaf30039e48199e40951c5a8d52894bb45e4d3
Author: Bruno Cadonna 
AuthorDate: Fri Jun 24 18:33:24 2022 +0200

KAFKA-10199: Expose tasks in state updater (#12312)

This PR exposes the tasks managed by the state updater. The state updater 
manages all tasks that were added to the state updater and that have not yet 
been removed from it by draining one of the output queues.

Reviewers: Guozhang Wang 
---
 .../processor/internals/DefaultStateUpdater.java   | 149 ++---
 .../streams/processor/internals/StateUpdater.java  |  61 +++-
 .../internals/DefaultStateUpdaterTest.java | 332 +
 3 files changed, 436 insertions(+), 106 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index cc580a3b38..0e84574c5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -47,7 +47,9 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DefaultStateUpdater implements StateUpdater {
 
@@ -86,7 +88,7 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 
 public boolean onlyStandbyTasksLeft() {
-return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);
+return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
 }
 
 @Override
@@ -152,9 +154,7 @@ public class DefaultStateUpdater implements StateUpdater {
 
 private void handleRuntimeException(final RuntimeException 
runtimeException) {
 log.error("An unexpected error occurred within the state updater 
thread: " + runtimeException);
-final ExceptionAndTasks exceptionAndTasks = new 
ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException);
-updatingTasks.clear();
-exceptionsAndFailedTasks.add(exceptionAndTasks);
+addToExceptionsAndFailedTasksThenClearUpdatingTasks(new 
ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException));
 isRunning.set(false);
 }
 
@@ -163,41 +163,51 @@ public class DefaultStateUpdater implements StateUpdater {
 final Set corruptedTaskIds = 
taskCorruptedException.corruptedTasks();
 final Set corruptedTasks = new HashSet<>();
 for (final TaskId taskId : corruptedTaskIds) {
-final Task corruptedTask = updatingTasks.remove(taskId);
+final Task corruptedTask = updatingTasks.get(taskId);
 if (corruptedTask == null) {
 throw new IllegalStateException("Task " + taskId + " is 
corrupted but is not updating. " + BUG_ERROR_MESSAGE);
 }
 corruptedTasks.add(corruptedTask);
 }
-exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, 
taskCorruptedException));
+addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(corruptedTasks, taskCorruptedException));
 }
 
 private void handleStreamsException(final StreamsException 
streamsException) {
 log.info("Encountered streams exception: ", streamsException);
-final ExceptionAndTasks exceptionAndTasks;
 if (streamsException.taskId().isPresent()) {
-exceptionAndTasks = 
handleStreamsExceptionWithTask(streamsException);
+handleStreamsExceptionWithTask(streamsException);
 } else {
-exceptionAndTasks = 
handleStreamsExceptionWithoutTask(streamsException);
+handleStreamsExceptionWithoutTask(streamsException);
 }
-exceptionsAndFailedTasks.add(exceptionAndTasks);
 }
 
-private ExceptionAndTasks handleStreamsExceptionWithTask(final 
StreamsException streamsException) {
+private void handleStreamsExceptionWithTask(final StreamsException 
streamsException) {
 final TaskId failedTaskId = streamsException.taskId().get();
 if (!updatingTasks.contai

[kafka] branch trunk updated: KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)

2022-06-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 925c628173 KAFKA-10199: Commit the restoration progress within 
StateUpdater (#12279)
925c628173 is described below

commit 925c6281733662cd40fffaab54a6483b00f80ee6
Author: Guozhang Wang 
AuthorDate: Thu Jun 23 10:46:14 2022 -0700

KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)

During restoring, we should always commit a.k.a. write checkpoint file 
regardless of EOS or ALOS, since if there's a failure we would just 
over-restore them upon recovery so no EOS violations happened.

Also when we complete restore or remove task, we should enforce a 
checkpoint as well; for failing cases though, we should not write a new one.

Reviewers: Bruno Cadonna 
---
 .../streams/processor/internals/AbstractTask.java  |   4 +-
 .../processor/internals/DefaultStateUpdater.java   |  36 +++-
 .../streams/processor/internals/StandbyTask.java   |   2 +-
 .../streams/processor/internals/StreamTask.java|   8 +-
 .../kafka/streams/processor/internals/Task.java|   6 ++
 .../internals/DefaultStateUpdaterTest.java | 101 +++--
 .../processor/internals/StandbyTaskTest.java   |  41 +
 .../processor/internals/StreamTaskTest.java|  16 
 8 files changed, 197 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 4e652a6dfc..c64fadfe5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task {
  * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
  *  or flushing state store get IO errors; such 
error should cause the thread to die
  */
-protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+@Override
+public void maybeCheckpoint(final boolean enforceCheckpoint) {
 final Map offsetSnapshot = 
stateMgr.changelogOffsets();
 if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, 
offsetSnapshotSinceLastFlush, offsetSnapshot)) {
 // the state's current offset would be used to checkpoint
@@ -98,7 +99,6 @@ public abstract class AbstractTask implements Task {
 }
 }
 
-
 @Override
 public TaskId id() {
 return id;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 54cb7bc427..cc580a3b38 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -85,7 +86,7 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 
 public boolean onlyStandbyTasksLeft() {
-return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
+return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);
 }
 
 @Override
@@ -111,6 +112,7 @@ public class DefaultStateUpdater implements StateUpdater {
 private void runOnce() throws InterruptedException {
 performActionsOnTasks();
 restoreTasks();
+maybeCheckpointUpdatingTasks(time.milliseconds());
 waitIfAllChangelogsCompletelyRead();
 }
 
@@ -252,6 +254,8 @@ public class DefaultStateUpdater implements StateUpdater {
 private void removeTask(final TaskId taskId) {
 final Task task = updatingTasks.remove(taskId);
 if (task != null) {
+task.maybeCheckpoint(true);
+
 final Collection changelogPartitions = 
task.changelogPartitions();
 changelogReader.unregister(changelogPartitions);
 removedTasks.add(task);
@@ -271,9 +275,10 @@ public class DefaultStateUpdater implements StateUpdater {

[kafka] branch trunk updated: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)

2022-06-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new cfdd567955 KAFKA-13880: Remove DefaultPartitioner from 
StreamPartitioner (#12304)
cfdd567955 is described below

commit cfdd567955588e134770a9145ba57800ca88313c
Author: Guozhang Wang 
AuthorDate: Fri Jun 17 20:17:02 2022 -0700

KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)

There are some considerata embedded in this seemingly straight-forward PR 
that I'd like to explain here. The StreamPartitioner is used to send records to 
three types of topics:

1) repartition topics, where key should never be null.
2) changelog topics, where key should never be null.
3) sink topics, where only non-windowed key could be null and windowed key 
should still never be null.
Also, the StreamPartitioner is used as part of the IQ to determine which 
host contains a certain key, as determined by the case 2) above.

This PR's main goal is to remove the deprecated producer's default 
partitioner, while with those things in mind such that:

We want to make sure for not-null keys, the default murmur2 hash behavior 
of the streams' partitioner stays consistent with producer's new built-in 
partitioner.
For null-keys (which is only possible for non-window default stream 
partition, and is never used for IQ), we would fix the issue that we may never 
rotate to a new partitioner by setting the partition as null hence relying on 
the newly introduced built-in partitioner.

Reviewers: Artem Livshits 
<84364232+artemlivsh...@users.noreply.github.com>, Matthias J. Sax 

---
 .../kafka/clients/producer/KafkaProducer.java  |  3 ++-
 .../producer/internals/BuiltInPartitioner.java |  7 +++
 .../producer/internals/DefaultPartitioner.java |  4 +---
 .../internals/WindowedStreamPartitioner.java   | 10 +-
 .../internals/DefaultStreamPartitioner.java| 22 --
 .../processor/internals/StreamsMetadataState.java  |  4 ++--
 6 files changed, 29 insertions(+), 21 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e85d9eb8a9..74d408d9a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
 import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetadata;
@@ -1385,7 +1386,7 @@ public class KafkaProducer implements Producer {
 
 if (serializedKey != null && !partitionerIgnoreKeys) {
 // hash the keyBytes to choose a partition
-return Utils.toPositive(Utils.murmur2(serializedKey)) % 
cluster.partitionsForTopic(record.topic()).size();
+return BuiltInPartitioner.partitionForKey(serializedKey, 
cluster.partitionsForTopic(record.topic()).size());
 } else {
 return RecordMetadata.UNKNOWN_PARTITION;
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
index 1c2d10f3f6..a5805df56b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
@@ -279,6 +279,13 @@ public class BuiltInPartitioner {
 }
 }
 
+/*
+ * Default hashing function to choose a partition from the serialized key 
bytes
+ */
+public static int partitionForKey(final byte[] serializedKey, final int 
numPartitions) {
+return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+}
+
 /**
  * The partition load stats for each topic that are used for adaptive 
partition distribution.
  */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 2c2e79fb20..716773626c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ 
b/clients/src/main/java/org/apache/kafka/cl

[kafka] branch trunk updated: KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)

2022-06-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. 
(#12263)
ee565f5f6b is described below

commit ee565f5f6b97c84d4f7f895fcb79188822284414
Author: jnewhouse 
AuthorDate: Thu Jun 16 14:27:38 2022 -0700

KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)

InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen 
in order to log them for durability. This set is never used nor cleared if 
logging is not enabled. Having it be populated creates a memory leak. This 
change stops populating the set if logging is not enabled.

Reviewers: Divij Vaidya , Kvicii 
<42023367+kvi...@users.noreply.github.com>, Guozhang Wang 
---
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 5894023bbe..5403f9e703 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer 
implements TimeOrdere
 delegate.remove();
 index.remove(next.getKey().key());
 
-dirtyKeys.add(next.getKey().key());
+if (loggingEnabled) {
+dirtyKeys.add(next.getKey().key());
+}
 
 memBufferSize -= computeRecordSize(next.getKey().key(), 
bufferValue);
 
@@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer 
implements TimeOrdere
 serializedKey,
 new BufferValue(serializedPriorValue, serialChange.oldValue, 
serialChange.newValue, recordContext)
 );
-dirtyKeys.add(serializedKey);
+if (loggingEnabled) {
+dirtyKeys.add(serializedKey);
+}
 updateBufferMetrics();
 }
 



[kafka] branch trunk updated: KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)

2022-06-14 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 39a555ba94 KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)
39a555ba94 is described below

commit 39a555ba94a6a5d851b31e0a7f07e19c48327835
Author: Guozhang Wang 
AuthorDate: Tue Jun 14 16:04:26 2022 -0700

KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)

Use the newly added function to replace the old addMetric function that may 
throw illegal argument exceptions.

Although in some cases concurrency should not be possible they do not 
necessarily remain always true in the future, so it's better to use the new API 
just to be less error-prone.

Reviewers: Bruno Cadonna 
---
 .../org/apache/kafka/clients/consumer/internals/Fetcher.java  | 11 +--
 .../java/org/apache/kafka/connect/runtime/WorkerTask.java |  5 ++---
 .../processor/internals/metrics/StreamsMetricsImpl.java   |  3 +--
 .../processor/internals/metrics/StreamsMetricsImplTest.java   |  2 +-
 4 files changed, 9 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fa7073cf0d..73ffd217ef 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1866,12 +1866,11 @@ public class Fetcher implements Closeable {
 for (TopicPartition tp : newAssignedPartitions) {
 if (!this.assignedPartitions.contains(tp)) {
 MetricName metricName = 
partitionPreferredReadReplicaMetricName(tp);
-if (metrics.metric(metricName) == null) {
-metrics.addMetric(
-metricName,
-(Gauge) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
-);
-}
+metrics.addMetricIfAbsent(
+metricName,
+null,
+(Gauge) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
+);
 }
 }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 072e4b34a1..f7c819fb4a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Frequencies;
@@ -377,10 +378,8 @@ abstract class WorkerTask implements Runnable {
 
 private void addRatioMetric(final State matchingState, 
MetricNameTemplate template) {
 MetricName metricName = metricGroup.metricName(template);
-if (metricGroup.metrics().metric(metricName) == null) {
-metricGroup.metrics().addMetric(metricName, (config, now) ->
+metricGroup.metrics().addMetricIfAbsent(metricName, null, 
(Gauge) (config, now) ->
 taskStateTimer.durationRatio(matchingState, now));
-}
 }
 
 void close() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 4bfd96265b..cb77fc9bdd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -502,9 +502,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 storeLevelTagMap(taskId, metricsScope, storeName)
 );
 if (metrics.metric(metricName) == null) {
-final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+metrics.addMetricIfAbsent(metricName, new 
MetricConfig().recordLevel(recordingLevel), valueProvider);
 final String key = 
storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
-metrics.addMetric(metricName, metricConfig, v

[kafka] branch trunk updated: KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)

2022-06-13 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method 
(#12121)
5cab11cf52 is described below

commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb
Author: vamossagar12 
AuthorDate: Mon Jun 13 23:06:39 2022 +0530

KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)

Reviewers: David Jacot , Justine Olshan 
, Guozhang Wang 
---
 .../org/apache/kafka/common/metrics/Metrics.java   | 40 +--
 .../org/apache/kafka/common/metrics/Sensor.java| 10 -
 .../kafka/connect/runtime/ConnectMetrics.java  |  8 +---
 .../internals/metrics/StreamsMetricsImplTest.java  | 46 ++
 4 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 52b7794a4c..398819016c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -509,7 +509,10 @@ public class Metrics implements Closeable {
 
Objects.requireNonNull(metricValueProvider),
 config == null ? this.config : config,
 time);
-registerMetric(m);
+KafkaMetric existingMetric = registerMetric(m);
+if (existingMetric != null) {
+throw new IllegalArgumentException("A metric named '" + metricName 
+ "' already exists, can't register another one.");
+}
 }
 
 /**
@@ -524,6 +527,26 @@ public class Metrics implements Closeable {
 addMetric(metricName, null, metricValueProvider);
 }
 
+/**
+ * Create or get an existing metric to monitor an object that implements 
MetricValueProvider.
+ * This metric won't be associated with any sensor. This is a way to 
expose existing values as metrics.
+ * This method takes care of synchronisation while updating/accessing 
metrics by concurrent threads.
+ *
+ * @param metricName The name of the metric
+ * @param metricValueProvider The metric value provider associated with 
this metric
+ * @return Existing KafkaMetric if already registered or else a newly 
created one
+ */
+public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig 
config, MetricValueProvider metricValueProvider) {
+KafkaMetric metric = new KafkaMetric(new Object(),
+Objects.requireNonNull(metricName),
+Objects.requireNonNull(metricValueProvider),
+config == null ? this.config : config,
+time);
+
+KafkaMetric existingMetric = registerMetric(metric);
+return existingMetric == null ? metric : existingMetric;
+}
+
 /**
  * Remove a metric if it exists and return it. Return null otherwise. If a 
metric is removed, `metricRemoval`
  * will be invoked for each reporter.
@@ -563,10 +586,18 @@ public class Metrics implements Closeable {
 }
 }
 
-synchronized void registerMetric(KafkaMetric metric) {
+/**
+ * Register a metric if not present or return an already existing metric 
otherwise.
+ * When a metric is newly registered, this method returns null
+ *
+ * @param metric The KafkaMetric to register
+ * @return KafkaMetric if the metric already exists, null otherwise
+ */
+synchronized KafkaMetric registerMetric(KafkaMetric metric) {
 MetricName metricName = metric.metricName();
-if (this.metrics.containsKey(metricName))
-throw new IllegalArgumentException("A metric named '" + metricName 
+ "' already exists, can't register another one.");
+if (this.metrics.containsKey(metricName)) {
+return this.metrics.get(metricName);
+}
 this.metrics.put(metricName, metric);
 for (MetricsReporter reporter : reporters) {
 try {
@@ -576,6 +607,7 @@ public class Metrics implements Closeable {
 }
 }
 log.trace("Registered metric named {}", metricName);
+return null;
 }
 
 /**
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ae3b8d997..25f3c21a31 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -297,7 +297,10 @@ public final class Sensor {
 for (NamedMeasurable m : stat.stats()) {
 final KafkaMetric metric = new KafkaMetric(lock, m.name(), 
m.stat(), statCo

[kafka] branch trunk updated: KAFKA-10199: Implement removing active and standby tasks from the state updater (#12270)

2022-06-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e67408c859 KAFKA-10199: Implement removing active and standby tasks 
from the state updater (#12270)
e67408c859 is described below

commit e67408c859fb2a80f1b3c208b7fef6ddc9a711fb
Author: Bruno Cadonna 
AuthorDate: Thu Jun 9 19:28:26 2022 +0200

KAFKA-10199: Implement removing active and standby tasks from the state 
updater (#12270)

This PR adds removing of active and standby tasks from the default 
implementation of the state updater. The PR also includes refactoring that 
clean up the code.

Reviewers: Guozhang Wang 
---
 .../processor/internals/DefaultStateUpdater.java   | 129 ---
 .../streams/processor/internals/StateUpdater.java  |  78 ++--
 .../streams/processor/internals/TaskAndAction.java |  67 
 .../internals/DefaultStateUpdaterTest.java | 419 +
 .../processor/internals/TaskAndActionTest.java |  68 
 5 files changed, 595 insertions(+), 166 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 55935d3e21..54cb7bc427 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -23,13 +23,13 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task.State;
+import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
 import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -57,7 +58,7 @@ public class DefaultStateUpdater implements StateUpdater {
 private final ChangelogReader changelogReader;
 private final AtomicBoolean isRunning = new AtomicBoolean(true);
 private final Consumer> offsetResetter;
-private final Map updatingTasks = new HashMap<>();
+private final Map updatingTasks = new 
ConcurrentHashMap<>();
 private final Logger log;
 
 public StateUpdaterThread(final String name,
@@ -72,7 +73,7 @@ public class DefaultStateUpdater implements StateUpdater {
 log = logContext.logger(DefaultStateUpdater.class);
 }
 
-public Collection getAllUpdatingTasks() {
+public Collection getUpdatingTasks() {
 return updatingTasks.values();
 }
 
@@ -117,11 +118,13 @@ public class DefaultStateUpdater implements StateUpdater {
 tasksAndActionsLock.lock();
 try {
 for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
-final Task task = taskAndAction.task;
-final Action action = taskAndAction.action;
+final Action action = taskAndAction.getAction();
 switch (action) {
 case ADD:
-addTask(task);
+addTask(taskAndAction.getTask());
+break;
+case REMOVE:
+removeTask(taskAndAction.getTaskId());
 break;
 }
 }
@@ -149,7 +152,7 @@ public class DefaultStateUpdater implements StateUpdater {
 log.error("An unexpected error occurred within the state updater 
thread: " + runtimeException);
 final ExceptionAndTasks exceptionAndTasks = new 
ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException);
 updatingTasks.clear();
-failedTasks.add(exceptionAndTasks);
+exceptionsAndFailedTasks.add(exceptionAndTasks);
 isRunning.set(false);
 }
 
@@ -164,7 +167,7 @@ public class DefaultStateUpdater implements StateUpdater {
 }
 corruptedTasks.add(corruptedTask);
 }
-failedTasks.add(new ExceptionAndTasks(corruptedTasks, 
taskCorruptedException));
+exceptionsAndFailedTasks.add(new E

[kafka] branch trunk updated: MINOR: Fix typo in Kafka config docs (#12268)

2022-06-08 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 249cd4461f MINOR: Fix typo in Kafka config docs (#12268)
249cd4461f is described below

commit 249cd4461faf003e6baa2327e9ceb130eb80c952
Author: Joel Hamill <11722533+joel-ham...@users.noreply.github.com>
AuthorDate: Wed Jun 8 13:51:41 2022 -0700

MINOR: Fix typo in Kafka config docs (#12268)

Reviewers: Guozhang Wang 
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f751cc21a6..3d7df18cbb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -980,8 +980,8 @@ object KafkaConfig {
   val ControllerQuotaWindowSizeSecondsDoc = "The time span of each sample for 
controller mutations quotas"
 
   val ClientQuotaCallbackClassDoc = "The fully qualified name of a class that 
implements the ClientQuotaCallback interface, " +
-"which is used to determine quota limits applied to client requests. By 
default, user, client-id, user or client-id " +
-"quotas stored in ZooKeeper are applied. For any given request, the most 
specific quota that matches the user principal " +
+"which is used to determine quota limits applied to client requests. By 
default, the user and client-id " +
+"quotas that are stored in ZooKeeper are applied. For any given request, 
the most specific quota that matches the user principal " +
 "of the session and the client-id of the request is applied."
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the 
admin tool will have no effect if this config is turned off"



[kafka] branch trunk updated: HOTFIX: add space to avoid checkstyle failure

2022-06-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5d593287c7 HOTFIX: add space to avoid checkstyle failure
5d593287c7 is described below

commit 5d593287c7db7030c444a469db72d59b3883d904
Author: Guozhang Wang 
AuthorDate: Mon Jun 6 11:34:59 2022 -0700

HOTFIX: add space to avoid checkstyle failure
---
 .../kafka/clients/consumer/internals/ConsumerCoordinatorTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 180053ee22..c65d33176f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -558,7 +558,7 @@ public abstract class ConsumerCoordinatorTest {
 
 // should try to find coordinator since we are commit async
 coordinator.commitOffsetsAsync(singletonMap(t1p, new 
OffsetAndMetadata(100L)), (offsets, exception) -> {
-fail("Commit should not get responses, but got offsets:" + offsets 
+", and exception:" + exception);
+fail("Commit should not get responses, but got offsets:" + offsets 
+ ", and exception:" + exception);
 });
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());



[kafka] branch 3.2 updated (90db4f47d6 -> b61edf2037)

2022-06-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 90db4f47d6 KAFKA-13773: catch kafkaStorageException to avoid broker 
shutdown directly (#12136)
 add 173b8fd26d HOTFIX: only try to clear discover-coordinator future upon 
commit (#12244)
 add b61edf2037 HOTFIX: add space to avoid checkstyle failure

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/ConsumerCoordinator.java| 23 +++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  1 -
 .../internals/ConsumerCoordinatorTest.java | 57 --
 3 files changed, 73 insertions(+), 8 deletions(-)



[kafka] branch trunk updated: HOTFIX: only try to clear discover-coordinator future upon commit (#12244)

2022-06-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 2047fc3715 HOTFIX: only try to clear discover-coordinator future upon 
commit (#12244)
2047fc3715 is described below

commit 2047fc371500286ba3e41d16551d2067edad
Author: Guozhang Wang 
AuthorDate: Mon Jun 6 11:05:41 2022 -0700

HOTFIX: only try to clear discover-coordinator future upon commit (#12244)

This is another way of fixing KAFKA-13563 other than #11631.

Instead of letting the consumer to always try to discover coordinator in 
pool with either mode (subscribe / assign), we defer the clearance of discover 
future upon committing async only. More specifically, under manual assign mode, 
there are only three places where we need the coordinator:

* commitAsync (both by the consumer itself or triggered by caller), this is 
where we want to fix.
* commitSync, which we already try to re-discovery coordinator.
* committed (both by the consumer itself based on reset policy, or 
triggered by caller), which we already try to re-discovery coordinator.

The benefits are that for manual assign mode that does not try to trigger 
any of the above three, then we never would be discovering coordinator. The 
original fix in #11631 would let the consumer to discover coordinator even if 
none of the above operations are required.

Reviewers: Luke Chen , David Jacot 
---
 .../consumer/internals/ConsumerCoordinator.java| 23 +++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  1 -
 .../internals/ConsumerCoordinatorTest.java | 57 --
 3 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 51fa0b62ed..b853ff99e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.time.Duration;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -548,14 +549,18 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 }
 }
 } else {
-// For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
+// For manually assigned partitions, we do not try to pro-actively 
lookup coordinator;
+// instead we only try to refresh metadata when necessary.
 // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
+client.awaitMetadataUpdate(timer);
 }
+
+// if there is pending coordinator requests, ensure they have a 
chance to be transmitted.
+client.pollNoWakeup();
 }
 
 maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
@@ -1004,7 +1009,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 if (offsets.isEmpty()) {
 // No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
 future = doCommitOffsetsAsync(offsets, callback);
-} else if (!coordinatorUnknown()) {
+} else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+// we need to make sure coordinator is ready before committing, 
since
+// this is for async committing we do not try to block, but just 
try once to
+// clear the previous discover-coordinator future, resend, or get 
responses;
+// if the coordinator is not ready yet then we would just proceed 
and put that into the
+// pending requests, and future poll calls would still try to 
complete them.
+//
+// the key here though is that we have to try sending the 
discover-coordinator if
+// it's not known or ready, since this is the only place we can 
send such request
+// under manual as

[kafka] branch trunk updated (9dc332f5ca -> 286bae4251)

2022-05-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 9dc332f5ca KAFKA-13217: Reconsider skipping the LeaveGroup on close() 
or add an overload that does so (#12035)
 add 286bae4251 KAFKA-10199: Implement adding standby tasks to the state 
updater (#12200)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/ChangelogReader.java   |   9 +
 .../processor/internals/DefaultStateUpdater.java   | 125 +---
 .../streams/processor/internals/StateUpdater.java  |  12 +-
 .../internals/DefaultStateUpdaterTest.java | 226 -
 4 files changed, 290 insertions(+), 82 deletions(-)



[kafka] branch trunk updated: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035)

2022-05-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 9dc332f5ca KAFKA-13217: Reconsider skipping the LeaveGroup on close() 
or add an overload that does so (#12035)
9dc332f5ca is described below

commit 9dc332f5ca34b80af369646f767c40c6b189f831
Author: Sayantanu Dey <41713730+sayantanu-...@users.noreply.github.com>
AuthorDate: Mon May 23 22:37:19 2022 +0530

KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an 
overload that does so (#12035)

This is for KIP-812:

* added leaveGroup on a new close function in kafka stream
* added logic to resolve future returned by remove member call in close 
method
* added max check on remainingTime value in close function


Reviewers: David Jacot , Guozhang Wang 

---
 .../org/apache/kafka/streams/KafkaStreams.java |  73 
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 203 +
 2 files changed, 276 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 9d9d57f8ab..ed62f89ebc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1349,6 +1349,24 @@ public class KafkaStreams implements AutoCloseable {
 }
 }
 
+/**
+ * Class that handles options passed in case of {@code KafkaStreams} 
instance scale down
+ */
+public static class CloseOptions {
+private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
+private boolean leaveGroup = false;
+
+public CloseOptions timeout(final Duration timeout) {
+this.timeout = timeout;
+return this;
+}
+
+public CloseOptions leaveGroup(final boolean leaveGroup) {
+this.leaveGroup = leaveGroup;
+return this;
+}
+}
+
 /**
  * Shutdown this {@code KafkaStreams} instance by signaling all the 
threads to stop, and then wait for them to join.
  * This will block until all threads have stopped.
@@ -1498,6 +1516,61 @@ public class KafkaStreams implements AutoCloseable {
 return close(timeoutMs);
 }
 
+/**
+ * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
+ * threads to join.
+ * @param options  contains timeout to specify how long to wait for the 
threads to shutdown, and a flag leaveGroup to
+ * trigger consumer leave call
+ * @return {@code true} if all threads were successfully 
stopped{@code false} if the timeout was reached
+ * before all threads stopped
+ * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
+ * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
+ */
+public synchronized boolean close(final CloseOptions options) throws 
IllegalArgumentException {
+final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
+final long timeoutMs = validateMillisecondDuration(options.timeout, 
msgPrefix);
+if (timeoutMs < 0) {
+throw new IllegalArgumentException("Timeout can't be negative.");
+}
+
+final long startMs = time.milliseconds();
+
+final boolean closeStatus = close(timeoutMs);
+
+final Optional groupInstanceId = clientSupplier
+
.getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId))
+.groupMetadata()
+.groupInstanceId();
+
+final long remainingTimeMs = Math.max(0, timeoutMs - 
(time.milliseconds() - startMs));
+
+if (options.leaveGroup && groupInstanceId.isPresent()) {
+log.debug("Sending leave group trigger to removing instance from 
consumer group");
+//removing instance from consumer group
+
+final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceId.get());
+
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = adminClient
+.removeMembersFromConsumerGroup(
+
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
+new 
RemoveMembersFromConsumerGroupOptions(membersToRemove)
+);
+
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
 Time

[kafka] branch trunk updated: MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189)

2022-05-21 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f6ba10ef9c MINOR: Fix flaky test 
TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft 
(#12189)
f6ba10ef9c is described below

commit f6ba10ef9c2c2d94473efd2fd596b172fcff494a
Author: Divij Vaidya 
AuthorDate: Sat May 21 19:33:44 2022 +0200

MINOR: Fix flaky test 
TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft 
(#12189)

Flaky test as failed in CI 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests/

The test fails because it does not wait for metadata to be propagated 
across brokers before killing a broker which may lead to it getting stale 
information. Note that a similar test was done in #12104 for a different test.

Reviewers: Kvicii Y, Ziming Deng, Jason Gustafson , 
Guozhang Wang 
---
 .../kafka/admin/TopicCommandIntegrationTest.scala  | 36 +-
 .../kafka/integration/KafkaServerTestHarness.scala |  6 +++-
 .../kafka/server/DeleteTopicsRequestTest.scala |  8 ++---
 3 files changed, 29 insertions(+), 21 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index 26c60e1c3e..3082babd06 100644
--- 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -586,16 +586,10 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
 try {
   killBroker(0)
-  val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-
   if (isKRaftTest()) {
-TestUtils.ensureConsistentKRaftMetadata(
-  aliveServers,
-  controllerServer,
-  "Timeout waiting for partition metadata propagating to brokers"
-)
+ensureConsistentKRaftMetadata()
   } else {
-TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0)
   }
   val output = TestUtils.grabConsoleOutput(
 topicService.describeTopic(new 
TopicCommandOptions(Array("--under-replicated-partitions"
@@ -618,8 +612,14 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
 try {
   killBroker(0)
-  val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-  TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+  if (isKRaftTest()) {
+ensureConsistentKRaftMetadata()
+  } else {
+TestUtils.waitUntilTrue(
+  () => 
aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 
0).get.isr().size() == 5),
+  s"Timeout waiting for partition metadata propagating to brokers for 
$testTopicName topic"
+)
+  }
   val output = TestUtils.grabConsoleOutput(
 topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"
   val rows = output.split("\n")
@@ -697,6 +697,16 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 try {
   killBroker(0)
   killBroker(1)
+
+  if (isKRaftTest()) {
+ensureConsistentKRaftMetadata()
+  } else {
+TestUtils.waitUntilTrue(
+  () => 
aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 
0).get.isr().size() == 4),
+  s"Timeout waiting for partition metadata propagating to brokers for 
$testTopicName topic"
+)
+  }
+
   val output = TestUtils.grabConsoleOutput(
 topicService.describeTopic(new 
TopicCommandOptions(Array("--at-min-isr-partitions"
   val rows = output.split("\n")
@@ -741,13 +751,11 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
 try {
   killBroker(0)
-  val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-
   if (isKRaftTest()) {
-TestUtils.ensureConsistentKRaftMetadata(aliveServers, 
controllerServer, "Timeout waiting for topic configs propagating to brokers")
+ensureConsistentKRaftMetadata()
   } else {
 TestUtils.waitUntilTrue(
-  () => aliveServers.forall(
+  () => aliveBrokers.forall(
 broker =>
   broker.metadataCache.getPartitionInfo(underMinIsrTopic, 
0).get.isr().size() < 6 &&
 broker.metadataCache.getPartitionInfo(offlineTopic, 
0).get.leader() == MetadataResponse.NO_LEADER_ID),
diff --git 
a/core/src/test/scala/unit/kafka/integ

[kafka-site] branch asf-site updated: MINOR: Fix DSL typo in streams docs (#412)

2022-05-18 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new e3def51b MINOR: Fix DSL typo in streams docs (#412)
e3def51b is described below

commit e3def51b3d357663e2546d6a5b11c4d6f6144247
Author: Milind Mantri 
AuthorDate: Wed May 18 22:44:55 2022 +0530

MINOR: Fix DSL typo in streams docs (#412)

Reviewers: Guozhang Wang 
---
 32/streams/developer-guide/dsl-topology-naming.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/32/streams/developer-guide/dsl-topology-naming.html 
b/32/streams/developer-guide/dsl-topology-naming.html
index 9e687f9a..cd11c132 100644
--- a/32/streams/developer-guide/dsl-topology-naming.html
+++ b/32/streams/developer-guide/dsl-topology-naming.html
@@ -41,7 +41,7 @@
   you are required to explicitly name each one.


-  At the DLS layer, there are operators.  A single DSL 
operator may
+  At the DSL layer, there are operators.  A single DSL 
operator may
   compile down to multiple Processors and 
State Stores, and
   if required repartition topics. But with the 
Kafka Streams
   DSL, all these names are generated for you. There is a 
relationship between



[kafka] branch trunk updated (467bce04ae4 -> f96e3813876)

2022-05-13 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 467bce04ae4 MINOR: Update release versions for upgrade tests with 
3.1.1 release (#12156)
 add f96e3813876 KAFKA-13746: Attempt to fix flaky test by waiting on 
metadata update (#12104)

No new revisions were added by this update.

Summary of changes:
 .../kafka/admin/TopicCommandIntegrationTest.scala  | 14 +-
 1 file changed, 13 insertions(+), 1 deletion(-)



[kafka] branch trunk updated: KAFKA-13785: [8/N][emit final] time-ordered session store (#12127)

2022-05-05 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3b08deaa76 KAFKA-13785: [8/N][emit final] time-ordered session store 
(#12127)
3b08deaa76 is described below

commit 3b08deaa761c2387a41610893dc8302ab1d97338
Author: Guozhang Wang 
AuthorDate: Thu May 5 16:09:16 2022 -0700

KAFKA-13785: [8/N][emit final] time-ordered session store (#12127)

Time ordered session store implementation. I introduced 
AbstractRocksDBTimeOrderedSegmentedBytesStore to make it generic for 
RocksDBTimeOrderedSessionSegmentedBytesStore and 
RocksDBTimeOrderedSegmentedBytesStore.

A few minor follow-up changes:

1. Avoid extra byte array allocation for fixed upper/lower range 
serialization.
2. Rename some class names to be more consistent.

Authored-by: Hao Li <1127478+lihao...@users.noreply.github.com>
Reviewers: Guozhang Wang , John Roesler 

---
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |   3 -
 ...ractRocksDBTimeOrderedSegmentedBytesStore.java} | 111 ++
 .../state/internals/PrefixedSessionKeySchemas.java | 387 +++
 .../state/internals/PrefixedWindowKeySchemas.java  |   6 +-
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 136 +++
 .../internals/RocksDBTimeOrderedSessionStore.java  | 156 
 ...ocksDBTimeOrderedWindowSegmentedBytesStore.java | 127 +++
 .../internals/RocksDBTimeOrderedWindowStore.java   |   4 +-
 ...IndexedTimeOrderedWindowBytesStoreSupplier.java |   4 +-
 ...ocksDbTimeOrderedSessionBytesStoreSupplier.java |  69 
 .../streams/state/internals/SessionKeySchema.java  |  40 +-
 .../internals/WrappedSessionStoreIterator.java |  12 +-
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 411 -
 .../state/internals/RocksDBSessionStoreTest.java   |  68 +++-
 .../RocksDBTimeOrderedSegmentedBytesStoreTest.java |  74 
 ...DBTimeOrderedWindowSegmentedBytesStoreTest.java | 121 ++
 .../state/internals/RocksDBWindowStoreTest.java|  68 ++--
 ...xedTimeOrderedWindowBytesStoreSupplierTest.java |   8 +-
 .../state/internals/SessionKeySchemaTest.java  | 223 ---
 .../internals/TimeOrderedWindowStoreTest.java  |   4 +-
 .../state/internals/WindowKeySchemaTest.java   |  10 +-
 21 files changed, 1757 insertions(+), 285 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index b1044eb49c..95c1d8d8c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -50,7 +50,6 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore segments;
-private final String metricScope;
 protected final KeySchema baseKeySchema;
 protected final Optional indexKeySchema;
 
@@ -65,12 +64,10 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore 
indexKeySchema,
  final AbstractSegments 
segments) {
 this.name = name;
-this.metricScope = metricScope;
 this.baseKeySchema = baseKeySchema;
 this.indexKeySchema = indexKeySchema;
 this.segments = segments;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
similarity index 65%
rename from 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
rename to 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index e87af877fb..f7216412f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -16,22 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.ProcessorStateException;
-import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
 

[kafka] branch trunk updated: KAFKA-10199: Implement adding active tasks to the state updater (#12128)

2022-05-05 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ced5989ff6 KAFKA-10199: Implement adding active tasks to the state 
updater (#12128)
ced5989ff6 is described below

commit ced5989ff69f8a5e76518fdeb39f41ab20b2574f
Author: Bruno Cadonna 
AuthorDate: Fri May 6 01:00:35 2022 +0200

KAFKA-10199: Implement adding active tasks to the state updater (#12128)

This PR adds the default implementation of the state updater. The 
implementation only implements adding active tasks to the state updater.

Reviewers: Guozhang Wang 
---
 .../processor/internals/ChangelogReader.java   |   2 +
 .../processor/internals/DefaultStateUpdater.java   | 373 +++
 .../streams/processor/internals/StateUpdater.java  |  19 +-
 .../processor/internals/StoreChangelogReader.java  |   3 +-
 .../internals/DefaultStateUpdaterTest.java | 408 +
 .../processor/internals/MockChangelogReader.java   |   5 +
 6 files changed, 804 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 9c62dd182e..38b00232c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -46,6 +46,8 @@ public interface ChangelogReader extends ChangelogRegister {
  */
 Set completedChangelogs();
 
+boolean allChangelogsCompleted();
+
 /**
  * Clear all partitions
  */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
new file mode 100644
index 00..0b6558d8ac
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.State;
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+"Please report at https://issues.apache.org/jira/projects/KAFKA/issues 
or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+private class StateUpdaterThread extends Thread {
+
+private final ChangelogReader changelogReader;
+private final AtomicBoolean isRunning = new AtomicBoolean(true);
+private final java.util.function.Consumer> 
offsetResetter;
+private final Map updatingTasks = new HashMap<>();
+private final Logger log;
+
+public StateUpdaterThread(final String name,
+  final ChangelogReader changelogReader,
+   

[kafka] branch trunk updated: MINOR: Fix typos in configuration docs (#11874)

2022-05-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 18b84d0404 MINOR: Fix typos in configuration docs (#11874)
18b84d0404 is described below

commit 18b84d0404d891263420628e12a2ce4f2ac85806
Author: Joel Hamill <11722533+joel-ham...@users.noreply.github.com>
AuthorDate: Wed May 4 10:27:14 2022 -0700

MINOR: Fix typos in configuration docs (#11874)

Reviewers: Chris Egerton, Weikang Sun, Andrew Eugene Choi, Luke Chen, 
Guozhang Wang
---
 .../java/org/apache/kafka/clients/producer/ProducerConfig.java   | 9 +
 .../src/main/java/org/apache/kafka/streams/StreamsConfig.java| 4 ++--
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6c258933af..2d586f255c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -216,10 +216,11 @@ public class ProducerConfig extends AbstractConfig {
 /** max.in.flight.requests.per.connection */
 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 
"max.in.flight.requests.per.connection";
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
-+ 
" Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of"
-+ 
" message re-ordering after a failed send due to retries (i.e., if retries are 
enabled)."
-+ 
" Additionally, enabling idempotence requires this config value to be less than 
or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
-+ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled.";
++ 
" Note that if this configuration is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of"
++ 
" message reordering after a failed send due to retries (i.e., if retries are 
enabled); "
++ 
" if retries are disabled or if enable.idempotence is set to true, 
ordering will be preserved."
++ 
" Additionally, enabling idempotence requires the value of this configuration 
to be less than or equal to " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
++ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled. ";
 
 /** retries */
 public static final String RETRIES_CONFIG = 
CommonClientConfigs.RETRIES_CONFIG;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 23c021c63f..6428f69188 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -498,8 +498,8 @@ public class StreamsConfig extends AbstractConfig {
 "org.apache.kafka.common.serialization.Serde interface.";
 
 public static final String WINDOWED_INNER_CLASS_SERDE = 
"windowed.inner.class.serde";
-private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default 
serializer / deserializer for the inner class of a windowed record. Must 
implement the \" +\n" +
-"\"org.apache.kafka.common.serialization.Serde 
interface.. Note that setting this config in KafkaStreams application would 
result " +
+private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default 
serializer / deserializer for the inner class of a windowed record. Must 
implement the " +
+"org.apache.kafka.common.serialization.Serde interface. 
Note that setting this config in KafkaStreams application would result " +
 "in an error as it is meant to be used only from Plain consumer 
client.";
 
 /** {@code default key.serde} */



[kafka] branch trunk updated: HOTFIX: Only measure in nano when producer metadata refresh is required (#12102)

2022-04-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e026384ffb HOTFIX: Only measure in nano when producer metadata refresh 
is required (#12102)
e026384ffb is described below

commit e026384ffb3170a2e71053a4163db58b9bd8fba6
Author: Guozhang Wang 
AuthorDate: Wed Apr 27 11:27:54 2022 -0700

HOTFIX: Only measure in nano when producer metadata refresh is required 
(#12102)

We added the metadata wait time in total blocked time (#11805). But we 
added it in the critical path of send which is called per-record, whereas 
metadata refresh only happens rarely. This way the cost of time.nanos becomes 
unnecessarily significant as we call it twice per record.

This PR moves the call to inside the waitOnMetadata callee and only when we 
do need to wait for a metadata refresh round-trip (i.e. we are indeed blocking).

Reviewers: Matthias J. Sax 
---
 .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f36db02a9d..85a3e239e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -931,7 +931,6 @@ public class KafkaProducer implements Producer {
 throwIfProducerClosed();
 // first make sure the metadata for the topic is available
 long nowMs = time.milliseconds();
-long nowNanos = time.nanoseconds();
 ClusterAndWaitTime clusterAndWaitTime;
 try {
 clusterAndWaitTime = waitOnMetadata(record.topic(), 
record.partition(), nowMs, maxBlockTimeMs);
@@ -941,7 +940,6 @@ public class KafkaProducer implements Producer {
 throw e;
 }
 nowMs += clusterAndWaitTime.waitedOnMetadataMs;
-producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
 long remainingWaitMs = Math.max(0, maxBlockTimeMs - 
clusterAndWaitTime.waitedOnMetadataMs);
 Cluster cluster = clusterAndWaitTime.cluster;
 byte[] serializedKey;
@@ -1080,6 +1078,7 @@ public class KafkaProducer implements Producer {
 // Issue metadata requests until we have metadata for the topic and 
the requested partition,
 // or until maxWaitTimeMs is exceeded. This is necessary in case the 
metadata
 // is stale and the number of partitions for this topic has increased 
in the meantime.
+long nowNanos = time.nanoseconds();
 do {
 if (partition != null) {
 log.trace("Requesting metadata update for partition {} of 
topic {}.", partition, topic);
@@ -,6 +1110,8 @@ public class KafkaProducer implements Producer {
 partitionsCount = cluster.partitionCountForTopic(topic);
 } while (partitionsCount == null || (partition != null && partition >= 
partitionsCount));
 
+producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
+
 return new ClusterAndWaitTime(cluster, elapsed);
 }
 



[kafka] branch trunk updated: MINOR: fix html generation syntax errors (#12094)

2022-04-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 6d7723f073 MINOR: fix html generation syntax errors (#12094)
6d7723f073 is described below

commit 6d7723f073ad0b935b02792be9aa4fd49b581b50
Author: Mike Tobola 
AuthorDate: Tue Apr 26 16:51:12 2022 -0700

MINOR: fix html generation syntax errors (#12094)

The html document generation has some errors in it, specifically related to 
protocols. The two issues identified and resolved are:

* Missing  closing tags added
* Invalid usage of a  tag as a wrapper element for  elements. 
Changed the  tag to be a .

Tested by running ./gradlew siteDocsTar and observing that the output was 
properly formed.

Reviewers: Guozhang Wang 
---
 .../main/java/org/apache/kafka/common/protocol/ApiKeys.java|  2 +-
 .../src/main/java/org/apache/kafka/common/protocol/Errors.java |  2 +-
 .../main/java/org/apache/kafka/common/protocol/Protocol.java   | 10 +-
 .../main/java/org/apache/kafka/common/protocol/types/Type.java |  2 +-
 4 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 399b631d76..628c9407cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -241,7 +241,7 @@ public enum ApiKeys {
 b.append("");
 b.append("\n");
 }
-b.append("\n");
+b.append("\n");
 return b.toString();
 }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f48ae6c233..5db9717906 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -500,7 +500,7 @@ public enum Errors {
 b.append("");
 b.append("\n");
 }
-b.append("\n");
+b.append("\n");
 return b.toString();
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d455b26eb2..a75eb0661d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -112,7 +112,7 @@ public class Protocol {
 b.append("");
 b.append("\n");
 }
-b.append("\n");
+b.append("\n");
 }
 
 public static String toHtml() {
@@ -148,7 +148,7 @@ public class Protocol {
 Schema schema = requests[i];
 // Schema
 if (schema != null) {
-b.append("");
+b.append("");
 // Version header
 b.append("");
 b.append(key.name);
@@ -159,7 +159,7 @@ public class Protocol {
 b.append("");
 schemaToFieldTableHtml(requests[i], b);
 }
-b.append("\n");
+b.append("\n");
 }
 
 // Responses
@@ -169,7 +169,7 @@ public class Protocol {
 Schema schema = responses[i];
 // Schema
 if (schema != null) {
-b.append("");
+b.append("");
 // Version header
 b.append("");
 b.append(key.name);
@@ -180,7 +180,7 @@ public class Protocol {
 b.append("");
 schemaToFieldTableHtml(responses[i], b);
 }
-b.append("\n");
+b.append("\n");
 }
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 46a59bd082..4af74dbf4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -1120,7 +1120,7 @@ public abstract class Type {
 b.append("");
 b.append("\n");
 }
-b.append("\n");
+b.append("\n");
 return b.toString();
 }
 



[kafka] branch trunk updated: MINOR: revert back to 60s session timeout for static membership test (#11881)

2022-04-21 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f28a2ee918 MINOR: revert back to 60s session timeout for static 
membership test (#11881)
f28a2ee918 is described below

commit f28a2ee918423e402ad9ddca9451b973dddfc591
Author: Luke Chen 
AuthorDate: Fri Apr 22 02:51:31 2022 +0800

MINOR: revert back to 60s session timeout for static membership test 
(#11881)

Reviewers: Guozhang Wang 
---
 tests/kafkatest/services/streams.py | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index f4f6a6a04f..5dedc57916 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -690,10 +690,9 @@ class StaticMemberTestService(StreamsTestBaseService):
   streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
   streams_property.NUM_THREADS: self.NUM_THREADS,
   consumer_property.GROUP_INSTANCE_ID: 
self.GROUP_INSTANCE_ID,
-  consumer_property.SESSION_TIMEOUT_MS: 6,
+  consumer_property.SESSION_TIMEOUT_MS: 6, # set 
longer session timeout for static member test
   'input.topic': self.INPUT_TOPIC,
-  "acceptable.recovery.lag": "9223372036854775807", # 
enable a one-shot assignment
-  "session.timeout.ms": "1" # set back to 10s for 
tests. See KIP-735
+  "acceptable.recovery.lag": "9223372036854775807" # 
enable a one-shot assignment
   }
 
 



[kafka] branch trunk updated (d83fccd65f -> c5077c679c)

2022-04-20 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from d83fccd65f KAFKA-13785: [5/N][emit final] cache for time ordered 
window store (#12030)
 add c5077c679c KAFKA-13588: consolidate `changelogFor` methods to simplify 
the generation of internal topic names (#11703)

No new revisions were added by this update.

Summary of changes:
 .../internals/InternalTopologyBuilder.java | 24 ---
 .../processor/internals/ProcessorContextUtils.java | 26 +---
 .../state/internals/CachingWindowStore.java| 20 ++--
 .../InMemoryTimeOrderedKeyValueBuffer.java | 12 +++---
 .../state/internals/MeteredKeyValueStore.java  | 28 --
 .../state/internals/MeteredSessionStore.java   | 28 --
 .../state/internals/MeteredWindowStore.java| 24 ---
 7 files changed, 49 insertions(+), 113 deletions(-)



[kafka] branch trunk updated (fb66b3668b -> d83fccd65f)

2022-04-20 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from fb66b3668b KAFKA-13799: Improve documentation for Kafka zero-copy 
(#12052)
 add d83fccd65f KAFKA-13785: [5/N][emit final] cache for time ordered 
window store (#12030)

No new revisions were added by this update.

Summary of changes:
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |   6 +-
 .../MergedSortedCacheWindowStoreIterator.java  |  14 +-
 ...rgedSortedCacheWindowStoreKeyValueIterator.java |  33 +-
 .../state/internals/PrefixedWindowKeySchemas.java  |  20 ++
 .../internals/RocksDBTimeOrderedWindowStore.java   |   7 +-
 ...ore.java => TimeOrderedCachingWindowStore.java} | 390 +
 .../internals/TimestampedWindowStoreBuilder.java   |  19 +
 ...dSortedCacheWrappedWindowStoreIteratorTest.java | 105 --
 ...acheWrappedWindowStoreKeyValueIteratorTest.java |  83 -
 ...meOrderedCachingPersistentWindowStoreTest.java} | 238 ++---
 .../TimestampedWindowStoreBuilderTest.java |  46 ++-
 11 files changed, 726 insertions(+), 235 deletions(-)
 copy 
streams/src/main/java/org/apache/kafka/streams/state/internals/{CachingWindowStore.java
 => TimeOrderedCachingWindowStore.java} (56%)
 copy 
streams/src/test/java/org/apache/kafka/streams/state/internals/{CachingPersistentWindowStoreTest.java
 => TimeOrderedCachingPersistentWindowStoreTest.java} (85%)



[kafka] branch trunk updated: KAFKA-13799: Improve documentation for Kafka zero-copy (#12052)

2022-04-20 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new fb66b3668b KAFKA-13799: Improve documentation for Kafka zero-copy 
(#12052)
fb66b3668b is described below

commit fb66b3668bd0a257a77ca43f7b1a31e8180f21f8
Author: RivenSun <91005273+rivens...@users.noreply.github.com>
AuthorDate: Thu Apr 21 01:31:32 2022 +0800

KAFKA-13799: Improve documentation for Kafka zero-copy (#12052)

Improve documentation for Kafka zero-copy. Kafka combines pagecache and 
zero-copy to greatly improve message consumption efficiency. But zero-copy only 
works in PlaintextTransportLayer.

Reviewers: Divij Vaidya , Guozhang Wang 

---
 docs/design.html | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/docs/design.html b/docs/design.html
index db71b65245..15eefa8132 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -125,6 +125,9 @@
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling
+SSL configuration, refer to security.protocol and 
security.inter.broker.protocol
+
 For more background on the sendfile and zero-copy support in Java, see 
this https://developer.ibm.com/articles/j-zerocopy/;>article.
 
 End-to-end Batch 
Compression



[kafka] branch trunk updated: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (#12041)

2022-04-14 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new cf5e714a8b MINOR: ignore unused configuration when ConsumerCoordinator 
is not constructed (#12041)
cf5e714a8b is described below

commit cf5e714a8bea8bb1de75201d0769bb1c246b9334
Author: RivenSun <91005273+rivens...@users.noreply.github.com>
AuthorDate: Fri Apr 15 08:30:43 2022 +0800

MINOR: ignore unused configuration when ConsumerCoordinator is not 
constructed (#12041)

Following PR #11940, ignore unused config when ConsumerCoordinator is not 
constructed.

Reviewers: Guozhang Wang 
---
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java| 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a49c89560f..6ffb772915 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -774,8 +774,12 @@ public class KafkaConsumer implements Consumer 
{
 );
 
 // no coordinator will be constructed for the default (null) group 
id
-this.coordinator = !groupId.isPresent() ? null :
-new ConsumerCoordinator(groupRebalanceConfig,
+if (!groupId.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+this.coordinator = null;
+} else {
+this.coordinator = new 
ConsumerCoordinator(groupRebalanceConfig,
 logContext,
 this.client,
 assignors,
@@ -788,6 +792,7 @@ public class KafkaConsumer implements Consumer {
 
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
 this.interceptors,
 
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+}
 this.fetcher = new Fetcher<>(
 logContext,
 this.client,



[kafka] branch trunk updated (a3a4323a5a -> f49cff412d)

2022-04-14 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from a3a4323a5a MINOR: Update LICENSE-binary (#12051)
 add f49cff412d MINOR: Remove redundant conditional judgments in 
Selector.clear() (#12048)

No new revisions were added by this update.

Summary of changes:
 clients/src/main/java/org/apache/kafka/common/network/Selector.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[kafka] branch trunk updated: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)

2022-04-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 1df232c839 MINOR: Supplement the description of `Valid Values` in the 
documentation of `compression.type` (#11985)
1df232c839 is described below

commit 1df232c839f4568718a52c04aad72b69beb52026
Author: RivenSun <91005273+rivens...@users.noreply.github.com>
AuthorDate: Wed Apr 13 12:24:57 2022 +0800

MINOR: Supplement the description of `Valid Values` in the documentation of 
`compression.type` (#11985)

Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and 
KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to 
verify whether the wrong value of compression.type will throw a ConfigException.

Reviewers: Mickael Maison , Guozhang Wang 

---
 .../org/apache/kafka/clients/producer/ProducerConfig.java |  4 +++-
 .../java/org/apache/kafka/common/record/CompressionType.java  |  6 ++
 .../org/apache/kafka/clients/producer/ProducerConfigTest.java | 11 +++
 core/src/main/scala/kafka/server/KafkaConfig.scala|  6 +++---
 core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala   |  2 +-
 5 files changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index afc1e55cdf..8fec07a297 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SecurityConfig;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig {
 in("all", "-1", "0", "1"),
 Importance.LOW,
 ACKS_DOC)
-.define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
+.define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), 
Importance.HIGH, COMPRESSION_TYPE_DOC)
 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
 .define(LINGER_MS_CONFIG, Type.LONG, 0, 
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 
120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 1b9754ffab..c526929b72 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -190,4 +190,10 @@ public enum CompressionType {
 else
 throw new IllegalArgumentException("Unknown compression name: " + 
name);
 }
+
+@Override
+public String toString() {
+return name;
+}
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index a2f318bebc..ae9de7b70a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.config.ConfigException;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ProducerConfigTest {
 
@@ -59,4 +61,13 @@ public class ProducerConfigTest {
 
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), 
keySerializerClass);
 
assertEquals(newConfigs.get(Pr

[kafka-site] branch asf-site updated: MINOR: Unsubscribe mailing lists (#402)

2022-04-01 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new c9f686f  MINOR: Unsubscribe mailing lists (#402)
c9f686f is described below

commit c9f686f14097babaf9d27ad47c4ed7d8d42446d5
Author: Tom Bentley 
AuthorDate: Sat Apr 2 00:01:22 2022 +0100

MINOR: Unsubscribe mailing lists (#402)

Reviewers: Guozhang Wang 
---
 contact.html | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/contact.html b/contact.html
index 270e2d6..81daaba 100644
--- a/contact.html
+++ b/contact.html
@@ -13,23 +13,23 @@


User mailing list: A list for general 
user questions about Kafka. To subscribe, send an email to mailto:users-subscr...@kafka.apache.org;>users-subscr...@kafka.apache.org.
 Once subscribed, you can ask general user questions by mailing to mailto:us...@kafka.apache.org;>us...@kafka.apache.org. Archives are 
available https://lists.apache.org/list.html?us...@kafka.apache.org;>here.
+   To unsubscribe, send an email to mailto:users-unsubscr...@kafka.apache.org;>users-unsubscr...@kafka.apache.org.


Developer mailing list: A list for 
discussion on Kafka development. To subscribe, send an email to mailto:dev-subscr...@kafka.apache.org;>dev-subscr...@kafka.apache.org.
 Once subscribed, you can have discussion on Kafka development by mailing 
to mailto:d...@kafka.apache.org;>d...@kafka.apache.org. Archives 
are available https://lists.apache.org/list.html?d...@kafka.apache.org;>here.
+   To unsubscribe, send an email to mailto:dev-unsubscr...@kafka.apache.org;>dev-unsubscr...@kafka.apache.org.


JIRA mailing list: A list to track 
Kafka https://issues.apache.org/jira/projects/KAFKA;>JIRA 
notifications. To subscribe, send an email to mailto:jira-subscr...@kafka.apache.org;>jira-subscr...@kafka.apache.org.
 Archives are available https://lists.apache.org/list.html?j...@kafka.apache.org;>here.
+   To unsubscribe, send an email to mailto:jira-unsubscr...@kafka.apache.org;>jira-unsubscr...@kafka.apache.org.


Commit mailing list: A list to track 
Kafka commits. To subscribe, send an email to mailto:commits-subscr...@kafka.apache.org;>commits-subscr...@kafka.apache.org.
 Archives are available http://mail-archives.apache.org/mod_mbox/kafka-commits;>here.
+   To unsubscribe, send an email to mailto:commits-unsubscr...@kafka.apache.org;>commits-unsubscr...@kafka.apache.org.


 

-   To unsubscribe from any of these you just change the word 
"subscribe" to "unsubscribe" in the email adresses above.
-   
-
-   
Prior to the move to Apache we had a Google group we used for 
discussion. Archives can be found http://groups.google.com/group/kafka-dev;>here. After that we were in 
Apache Incubator which has its own archives for http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/;>user,
 http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/;>dev, 
and http://mail-archives.apache.org/mod_mbox/incubator-kafka-commits/;>commit
 lists.

 


[kafka] branch trunk updated (2a27059 -> 19a6269)

2022-03-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 2a27059  MINOR: Improved display names for parameterized KRaft and ZK 
tests (#11957)
 add 19a6269  MINOR: Fix log4j entry in RepartitionTopics (#11958)

No new revisions were added by this update.

Summary of changes:
 .../kafka/streams/processor/internals/RepartitionTopics.java  | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)


  1   2   3   4   5   6   7   8   9   10   >