[kafka-site] branch asf-site updated: MINOR: update Kafka Streams state.dir doc (#536)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 1ac4a1d1 MINOR: update Kafka Streams state.dir doc (#536) 1ac4a1d1 is described below commit 1ac4a1d151491b339d0116dbd45a47bca4911103 Author: Matthias J. Sax AuthorDate: Sun Aug 6 10:20:32 2023 -0700 MINOR: update Kafka Streams state.dir doc (#536) Default state directory was changes in 2.8.0 release (cf KAFKA-10604) Reviewers: Guozhang Wang --- 28/streams/developer-guide/config-streams.html | 2 +- 30/streams/developer-guide/config-streams.html | 2 +- 31/streams/developer-guide/config-streams.html | 2 +- 32/streams/developer-guide/config-streams.html | 2 +- 33/streams/developer-guide/config-streams.html | 2 +- 34/streams/developer-guide/config-streams.html | 2 +- 35/streams/developer-guide/config-streams.html | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/28/streams/developer-guide/config-streams.html b/28/streams/developer-guide/config-streams.html index 5b89ef4d..706e456e 100644 --- a/28/streams/developer-guide/config-streams.html +++ b/28/streams/developer-guide/config-streams.html @@ -320,7 +320,7 @@ state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium diff --git a/30/streams/developer-guide/config-streams.html b/30/streams/developer-guide/config-streams.html index 2fa3088f..51706f74 100644 --- a/30/streams/developer-guide/config-streams.html +++ b/30/streams/developer-guide/config-streams.html @@ -335,7 +335,7 @@ settings.put(... , ...); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium diff --git a/31/streams/developer-guide/config-streams.html b/31/streams/developer-guide/config-streams.html index 318f9cb0..1ce575fa 100644 --- a/31/streams/developer-guide/config-streams.html +++ b/31/streams/developer-guide/config-streams.html @@ -407,7 +407,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium diff --git a/32/streams/developer-guide/config-streams.html b/32/streams/developer-guide/config-streams.html index a99d90da..65279e61 100644 --- a/32/streams/developer-guide/config-streams.html +++ b/32/streams/developer-guide/config-streams.html @@ -415,7 +415,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium diff --git a/33/streams/developer-guide/config-streams.html b/33/streams/developer-guide/config-streams.html index a99d90da..65279e61 100644 --- a/33/streams/developer-guide/config-streams.html +++ b/33/streams/developer-guide/config-streams.html @@ -415,7 +415,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium diff --git a/34/streams/developer-guide/config-streams.html b/34/streams/developer-guide/config-streams.html index bccf97eb..5aaf52c8 100644 --- a/34/streams/developer-guide/config-streams.html +++ b/34/streams/developer-guide/config-streams.html @@ -415,7 +415,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium diff --git a/35/streams/developer-guide/config-streams.html b/35/streams/developer-guide/config-streams.html index b1512df2..a38e8b3d 100644 --- a/35/streams/developer-guide/config-streams.html +++ b/35/streams/developer-guide/config-streams.html @@ -415,7 +415,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium
[kafka] branch trunk updated: MINOR: update Kafka Streams state.dir doc (#14155)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7a2e11cae73 MINOR: update Kafka Streams state.dir doc (#14155) 7a2e11cae73 is described below commit 7a2e11cae739f3391f61e2e29b148d3a3ebea8b3 Author: Matthias J. Sax AuthorDate: Sun Aug 6 10:20:08 2023 -0700 MINOR: update Kafka Streams state.dir doc (#14155) Default state directory was changes in 2.8.0 release (cf KAFKA-10604) Reviewers: Guozhang Wang --- docs/streams/developer-guide/config-streams.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index b1512df24b5..a38e8b3d3b7 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -415,7 +415,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); state.dir High Directory location for state stores. -/tmp/kafka-streams +/${java.io.tmpdir}/kafka-streams task.timeout.ms Medium
[kafka] branch trunk updated: KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e0b7499103d KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920) e0b7499103d is described below commit e0b7499103df9222140cdbf7047494d92913987e Author: flashmouse AuthorDate: Fri Aug 4 02:17:08 2023 +0800 KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920) in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments because isBalanced have a trivial mistake, and result in rebalance timeout in some situation. Co-authored-by: lixy Reviewers: Ritika Reddy , Philip Nee , Kirk True , Guozhang Wang --- .../consumer/internals/AbstractStickyAssignor.java | 5 +- .../internals/AbstractStickyAssignorTest.java | 86 ++ 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 1bde792d598..0823752d159 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -158,10 +158,11 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { // generation amongst for (final TopicPartition tp : memberData.partitions) { if (allTopics.contains(tp.topic())) { -String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +String otherConsumer = allPreviousPartitionsToOwner.get(tp); if (otherConsumer == null) { // this partition is not owned by other consumer in the same generation ownedPartitions.add(tp); +allPreviousPartitionsToOwner.put(tp, consumer); } else { final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); @@ -1172,7 +1173,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { if (!currentAssignment.get(consumer).contains(topicPartition)) { String otherConsumer = allPartitions.get(topicPartition); int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size(); -if (consumerPartitionCount < otherConsumerPartitionCount) { +if (consumerPartitionCount + 1 < otherConsumerPartitionCount) { log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.", topicPartition, otherConsumer, consumer); return false; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index cdb0142c49b..71d188f3cd7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -724,6 +725,91 @@ public abstract class AbstractStickyAssignorTest { assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) +@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) +@ValueSource(booleans = {false, true}) +public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { +initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); +int topicCount = hasConsumerRack ? 50 : 100; +int partitionCount = 2_00; +int consumerCount = 5_00; + +List topics = new ArrayList<>(); +Map> partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); +} +for (int i = 0; i < consumerCoun
[kafka] branch trunk updated (c4ad09e47d7 -> 750a3893081)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from c4ad09e47d7 MINOR: Add more KRaft reassignment tests (#13521) add 750a3893081 MINOR: Follow-up on failing streams test, and fix StoreChangelogReader (#13523) No new revisions were added by this update. Summary of changes: .../processor/internals/DefaultStateUpdater.java | 2 +- .../streams/processor/internals/ReadOnlyTask.java | 2 +- .../streams/processor/internals/StandbyTask.java | 6 +++- .../streams/processor/internals/StateUpdater.java | 6 +++- .../processor/internals/StoreChangelogReader.java | 37 ++ .../streams/processor/internals/StreamTask.java| 14 .../kafka/streams/processor/internals/Task.java| 2 +- .../integration/PauseResumeIntegrationTest.java| 32 ++- .../internals/DefaultStateUpdaterTest.java | 4 +-- .../processor/internals/StandbyTaskTest.java | 4 +-- .../internals/StoreChangelogReaderTest.java| 28 +--- .../processor/internals/StreamTaskTest.java| 6 ++-- .../processor/internals/TaskManagerTest.java | 2 +- .../internals/metrics/TaskMetricsTest.java | 15 ++--- 14 files changed, 103 insertions(+), 57 deletions(-)
[kafka] branch trunk updated (2117c4bce8f -> f02f5f8c8a8)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 2117c4bce8f Minor: fix ReadOnlyTaskTest (#13519) add f02f5f8c8a8 MINOR: fix stream failing tests (#13512) No new revisions were added by this update. Summary of changes: .../streams/processor/internals/StoreChangelogReader.java | 15 --- .../processor/internals/metrics/TaskMetricsTest.java | 2 ++ 2 files changed, 6 insertions(+), 11 deletions(-)
[kafka] branch trunk updated (4f34ce1b491 -> 2117c4bce8f)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 4f34ce1b491 KAFKA-14376: Add ConfigProvider to make use of environment variables KIP-887 (#12992) add 2117c4bce8f Minor: fix ReadOnlyTaskTest (#13519) No new revisions were added by this update. Summary of changes: .../apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java| 4 1 file changed, 4 insertions(+)
[kafka] branch 3.4 updated: KAFKA-14172: Should clear cache when active recycled from standby (#13369)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new 2dd3713b2a1 KAFKA-14172: Should clear cache when active recycled from standby (#13369) 2dd3713b2a1 is described below commit 2dd3713b2a183bb904651c0022c5d953970b4ad3 Author: Guozhang Wang AuthorDate: Wed Apr 5 16:05:11 2023 -0700 KAFKA-14172: Should clear cache when active recycled from standby (#13369) This fix is inspired by #12540. 1. Added a clearCache function for CachedStateStore, which would be triggered upon recycling a state manager. 2. Added the integration test inherited from #12540 . 3. Improved some log4j entries. 4. Found and fixed a minor issue with log4j prefix. Reviewers: Lucas Brutschy , Matthias J. Sax --- .../processor/internals/ActiveTaskCreator.java | 4 +- .../processor/internals/ProcessorStateManager.java | 20 +- .../processor/internals/StandbyTaskCreator.java| 2 +- .../processor/internals/StoreChangelogReader.java | 2 +- .../streams/state/internals/CachedStateStore.java | 10 + .../state/internals/CachingKeyValueStore.java | 12 + .../state/internals/CachingSessionStore.java | 5 + .../state/internals/CachingWindowStore.java| 5 + .../kafka/streams/state/internals/NamedCache.java | 7 + .../kafka/streams/state/internals/ThreadCache.java | 8 + .../internals/TimeOrderedCachingWindowStore.java | 5 + .../streams/state/internals/WrappedStateStore.java | 8 + ...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 + .../internals/ProcessorStateManagerTest.java | 29 ++ 14 files changed, 411 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 581175e2c97..7f423c84069 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -226,7 +226,7 @@ class ActiveTaskCreator { } standbyTask.prepareRecycle(); -standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE); +standbyTask.stateMgr.transitionTaskType(Task.TaskType.ACTIVE, getLogContext(standbyTask.id)); final RecordCollector recordCollector = createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), standbyTask.topology); final StreamTask task = new StreamTask( @@ -324,7 +324,7 @@ class ActiveTaskCreator { private LogContext getLogContext(final TaskId taskId) { final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); -final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", taskId); +final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "stream-task", taskId); return new LogContext(logPrefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 6a66e428ea5..746bb02db29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -588,14 +588,30 @@ public class ProcessorStateManager implements StateManager { final List allChangelogs = getAllChangelogTopicPartitions(); changelogReader.unregister(allChangelogs); } + +// when the state manager is recycled to be used, future writes may bypass its store's caching +// layer if they are from restoration, hence we need to clear the state store's caches just in case +// See KAFKA-14172 for details +if (!stores.isEmpty()) { +log.debug("Clearing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +if (store instanceof CachedStateStore) { +((CachedStateStore) store).clearCache(); +} +log.trace("Cleared cache {}", store.name()); +} +} } -void transitionTaskType(final TaskType newType) { +void transitionTaskType(final TaskType newType, final LogContext logContext) { if (taskType.equals(newType)) { throw new IllegalStateException("Tried to recycle state for task type conver
[kafka] branch trunk updated (653baa66948 -> b2ee6df1c4c)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 653baa66948 KAFKA-10199: Add task updater metrics, part 2 (#13300) add b2ee6df1c4c KAFKA-14172: Should clear cache when active recycled from standby (#13369) No new revisions were added by this update. Summary of changes: .../processor/internals/ActiveTaskCreator.java | 4 +- .../processor/internals/ProcessorStateManager.java | 20 +- .../processor/internals/StandbyTaskCreator.java| 2 +- .../processor/internals/StoreChangelogReader.java | 2 +- .../streams/state/internals/CachedStateStore.java | 10 + .../state/internals/CachingKeyValueStore.java | 12 + .../state/internals/CachingSessionStore.java | 5 + .../state/internals/CachingWindowStore.java| 5 + .../kafka/streams/state/internals/NamedCache.java | 7 + .../kafka/streams/state/internals/ThreadCache.java | 8 + .../internals/TimeOrderedCachingWindowStore.java | 5 + .../streams/state/internals/WrappedStateStore.java | 8 + ...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 300 + .../internals/ProcessorStateManagerTest.java | 29 ++ 14 files changed, 411 insertions(+), 6 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
[kafka] branch trunk updated (6d36db1c78f -> beb0be5fe45)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 6d36db1c78f KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests (#13374) add beb0be5fe45 KAFKA-14533: Do not interrupt state-updater thread during shutdown (#13318) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreams.java | 2 +- .../streams/processor/internals/ClientUtils.java | 36 ++ .../processor/internals/DefaultStateUpdater.java | 57 +- .../internals/StreamsPartitionAssignor.java| 17 --- .../streams/processor/internals/TaskManager.java | 2 + .../SmokeTestDriverIntegrationTest.java| 3 +- ...ghAvailabilityStreamsPartitionAssignorTest.java | 39 +++ .../RackAwarenessStreamsPartitionAssignorTest.java | 35 +++-- .../internals/StreamsPartitionAssignorTest.java| 19 +++- .../internals/assignment/AssignmentTestUtils.java | 19 +++- 10 files changed, 112 insertions(+), 117 deletions(-)
[kafka] branch trunk updated (f8d0fc835bf -> 5c0e4aa6764)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from f8d0fc835bf MINOR: Remove addOne to fix build (#13469) add 5c0e4aa6764 KAFKA-14468: Committed API (#13380) No new revisions were added by this update. Summary of changes: .../consumer/internals/CommitRequestManager.java | 364 ++--- .../internals/DefaultBackgroundThread.java | 3 +- .../consumer/internals/PrototypeAsyncConsumer.java | 79 +++-- .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 22 +- .../events/OffsetFetchApplicationEvent.java| 43 +++ .../internals/CommitRequestManagerTest.java| 304 +++-- .../internals/PrototypeAsyncConsumerTest.java | 86 - .../kafka/api/BaseAsyncConsumerTest.scala | 2 +- 9 files changed, 781 insertions(+), 124 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
[kafka] branch trunk updated (7438f100cf4 -> 31440b00f3e)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 7438f100cf4 KAFKA-14774 the removed listeners should not be reconfigurable (#13326) add 31440b00f3e KAFKA-14848: KafkaConsumer incorrectly passes locally-scoped serializers to FetchConfig (#13452) No new revisions were added by this update. Summary of changes: .../kafka/clients/consumer/KafkaConsumer.java | 5 +- .../clients/consumer/internals/FetchConfig.java| 10 ++- .../consumer/internals/FetchConfigTest.java| 92 ++ 3 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchConfigTest.java
[kafka] branch trunk updated: KAFKA-14365: Extract common logic from Fetcher (#13425)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a3252629a37 KAFKA-14365: Extract common logic from Fetcher (#13425) a3252629a37 is described below commit a3252629a37344be1f1f86a77cf685f9c147be4d Author: Kirk True AuthorDate: Fri Mar 24 14:33:13 2023 -0700 KAFKA-14365: Extract common logic from Fetcher (#13425) * KAFKA-14365: Extract common logic from Fetcher Extract logic from Fetcher into AbstractFetcher. Also introduce FetchConfig as a more concise way to delineate state from incoming configuration. Formalized the defaults in CommonClientConfigs and ConsumerConfig to be accessible elsewhere. * Removed overridden methods in favor of synchronizing where needed Reviewers: Guozhang Wang --- checkstyle/suppressions.xml| 14 +- .../apache/kafka/clients/CommonClientConfigs.java | 1 + .../kafka/clients/consumer/ConsumerConfig.java | 14 +- .../kafka/clients/consumer/KafkaConsumer.java | 25 +- .../clients/consumer/internals/AbstractFetch.java | 791 + .../clients/consumer/internals/CompletedFetch.java | 25 +- .../clients/consumer/internals/FetchConfig.java| 124 .../consumer/internals/FetchMetricsManager.java| 15 +- .../kafka/clients/consumer/internals/Fetcher.java | 727 +-- .../kafka/clients/consumer/KafkaConsumerTest.java | 21 +- .../consumer/internals/CompletedFetchTest.java | 34 +- .../clients/consumer/internals/FetcherTest.java| 54 +- .../consumer/internals/OffsetFetcherTest.java | 18 +- 13 files changed, 1042 insertions(+), 821 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 93ae7c30d0f..5c95c8d4284 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -46,7 +46,7 @@ + files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/> - + files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/> @@ -87,13 +85,13 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/> + files="(AbstractFetch|ConsumerCoordinator|OffsetFetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/> + files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/> @@ -108,7 +106,7 @@ + files="(Sender|Fetcher|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/> @@ -117,7 +115,7 @@ files="MockAdminClient.java"/> + files="(OffsetFetcher|RequestResponse)Test.java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index d88aa0a6a1c..ee190df50e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -74,6 +74,7 @@ public class CommonClientConfigs { public static final String CLIENT_RACK_CONFIG = "client.rack"; public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'"; +public static final String DEFAULT_CLIENT_RACK = ""; public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. T
[kafka] branch trunk updated (e07cc127e12 -> f79c2a6e041)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from e07cc127e12 MINOR: Fix remote_kraft -> isolated_kraft in kafkatest (#13439) add f79c2a6e041 MINOR:Incorrect/canonical use of constants in AdminClientConfig and StreamsConfigTest (#13427) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/kafka/clients/admin/AdminClientConfig.java | 4 ++-- streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-)
[kafka] branch trunk updated (df5850274d3 -> 6fae237638e)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from df5850274d3 MINOR: Expand use of PartitionAssignment (#13402) add 6fae237638e MINOR: Use JUnit-5 extension to enforce strict stubbing (#13347) No new revisions were added by this update. Summary of changes: build.gradle| 1 + .../streams/state/internals/metrics/RocksDBMetricsRecorderTest.java | 6 ++ 2 files changed, 7 insertions(+)
[kafka] branch trunk updated: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring (#13301)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c74c0f2facd KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring (#13301) c74c0f2facd is described below commit c74c0f2facde2b392ab745144d6ad520575ab9ef Author: Kirk True AuthorDate: Fri Mar 10 10:17:14 2023 -0800 KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring (#13301) The Fetcher class is used internally by the KafkaConsumer to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored Fetcher. This task includes refactoring Fetcher by extracting out the inner classes into top-level (though still in internal) so that those classes can be referenced by forthcoming refactored fetch logic. Reviewers: Philip Nee , Guozhang Wang --- .../kafka/clients/consumer/KafkaConsumer.java | 4 +- .../clients/consumer/internals/CompletedFetch.java | 365 ++ .../consumer/internals/ConsumerMetrics.java| 4 +- .../consumer/internals/FetchMetricsAggregator.java | 95 .../consumer/internals/FetchMetricsManager.java| 203 ...ricsRegistry.java => FetchMetricsRegistry.java} | 8 +- .../kafka/clients/consumer/internals/Fetcher.java | 557 ++--- .../clients/consumer/internals/SensorBuilder.java | 115 + .../consumer/internals/CompletedFetchTest.java | 304 +++ .../internals/FetchMetricsManagerTest.java | 171 +++ .../clients/consumer/internals/FetcherTest.java| 4 +- .../consumer/internals/OffsetFetcherTest.java | 2 +- 12 files changed, 1300 insertions(+), 532 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3f966121d69..11ac675cb4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.Fetch; import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; +import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry; import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.OffsetFetcher; @@ -737,7 +737,7 @@ public class KafkaConsumer implements Consumer { this.metadata.bootstrap(addresses); String metricGrpPrefix = "consumer"; -FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix); +FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); this.isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java new file mode 100644 index 000..6a11b846810 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clie
[kafka] branch trunk updated: KAFKA-14753: Improve kafka producer example (#13354)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4527e54647a KAFKA-14753: Improve kafka producer example (#13354) 4527e54647a is described below commit 4527e54647a0b8457f7f2b5d804eb65dc4d9d817 Author: Philip Nee AuthorDate: Tue Mar 7 16:25:49 2023 -0800 KAFKA-14753: Improve kafka producer example (#13354) Reviewers: Guozhang Wang --- .../src/main/java/kafka/examples/Producer.java | 65 +++--- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index e649a7862c9..e85fa16060e 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -27,7 +27,13 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +/** + * Demo producer that demonstrate two modes of KafkaProducer. + * If the user uses the Async mode: The messages will be printed to stdout upon successful completion + * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + */ public class Producer extends Thread { private final KafkaProducer producer; private final String topic; @@ -54,8 +60,8 @@ public class Producer extends Thread { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); } props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); - producer = new KafkaProducer<>(props); + this.topic = topic; this.isAsync = isAsync; this.numRecords = numRecords; @@ -70,28 +76,45 @@ public class Producer extends Thread { public void run() { int messageKey = 0; int recordsSent = 0; -while (recordsSent < numRecords) { -String messageStr = "Message_" + messageKey; -long startTime = System.currentTimeMillis(); -if (isAsync) { // Send asynchronously -producer.send(new ProducerRecord<>(topic, -messageKey, -messageStr), new DemoCallBack(startTime, messageKey, messageStr)); -} else { // Send synchronously -try { -producer.send(new ProducerRecord<>(topic, -messageKey, -messageStr)).get(); -System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); -} catch (InterruptedException | ExecutionException e) { -e.printStackTrace(); -} +try { +while (recordsSent < numRecords) { +final long currentTimeMs = System.currentTimeMillis(); +produceOnce(messageKey, recordsSent, currentTimeMs); +messageKey += 2; +recordsSent += 1; } -messageKey += 2; -recordsSent += 1; +} catch (Exception e) { +System.out.println("Producer encountered exception:" + e); +} finally { +System.out.println("Producer sent " + numRecords + " records successfully"); +this.producer.close(); +latch.countDown(); } -System.out.println("Producer sent " + numRecords + " records successfully"); -latch.countDown(); +} + +private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException { +String messageStr = "Message_" + messageKey; + +if (isAsync) { // Send asynchronously +sendAsync(messageKey, messageStr, currentTimeMs); +return; +} +Future future = send(messageKey, messageStr); +future.get(); +System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); +} + +private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) { +this.producer.send(new ProducerRecord<>(topic, +messageKey, +messageStr), +new DemoCallBack(currentTimeMs, messageKey, messageStr)); +} + +private Future send(final int messageKey, final String messageStr) { +return producer.send(new ProducerRecord<>(topic, +messageKey, +messageStr)); } }
[kafka] branch trunk updated (6fbe4d85a22 -> b19ae7857b0)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 6fbe4d85a22 KAFKA-14761 Adding integration test for the prototype consumer (#13303) add b19ae7857b0 KAFKA-14752: Improving the existing consumer examples (#13353) No new revisions were added by this update. Summary of changes: .../src/main/java/kafka/examples/Consumer.java | 29 +++--- .../src/main/java/kafka/examples/Producer.java | 2 +- 2 files changed, 26 insertions(+), 5 deletions(-)
[kafka] branch trunk updated (77215eded7b -> 6fbe4d85a22)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 77215eded7b KAFKA-14792: Race condition in LazyIndex.get() (#13359) add 6fbe4d85a22 KAFKA-14761 Adding integration test for the prototype consumer (#13303) No new revisions were added by this update. Summary of changes: .../consumer/internals/DefaultEventHandler.java| 1 + .../consumer/internals/NetworkClientDelegate.java | 18 ++-- .../consumer/internals/PrototypeAsyncConsumer.java | 74 +++-- .../events/ApplicationEventProcessor.java | 5 +- .../consumer/internals/events/EventHandler.java| 3 +- .../internals/PrototypeAsyncConsumerTest.java | 116 + .../kafka/api/AbstractConsumerTest.scala | 10 +- .../kafka/api/BaseAsyncConsumerTest.scala | 47 + .../integration/kafka/api/ConsumerBounceTest.scala | 14 +-- .../kafka/api/IntegrationTestHarness.scala | 21 +++- .../kafka/api/PlaintextConsumerTest.scala | 10 +- 11 files changed, 234 insertions(+), 85 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
[kafka] branch trunk updated (38c409cf33c -> a6d8988179d)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 38c409cf33c KAFKA-14084: SCRAM support in KRaft. (#13114) add a6d8988179d MINOR: Clarify docs for Streams config max.warmup.replicas. (#13082) No new revisions were added by this update. Summary of changes: docs/streams/developer-guide/config-streams.html | 18 ++ .../java/org/apache/kafka/streams/StreamsConfig.java | 6 +- 2 files changed, 19 insertions(+), 5 deletions(-)
[kafka] branch trunk updated (71fa008b456 -> 47450ee064b)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 71fa008b456 KAFKA-14745: Cache the ReplicationPolicy instance in MirrorConnectorConfig (#13328) add 47450ee064b MINOR: update RocksDBMetricsRecorder test to JUnit5 and fix memory leak (#13336) No new revisions were added by this update. Summary of changes: .../metrics/RocksDBMetricsRecorderTest.java | 20 1 file changed, 12 insertions(+), 8 deletions(-)
[kafka] branch trunk updated (3d9a03cfe82 -> 58429532497)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 3d9a03cfe82 MINOR: fix rerun-tests for unit test (#13288) add 58429532497 MINOR: Fix flaky tests in DefaultStateUpdaterTest (#13319) No new revisions were added by this update. Summary of changes: .../processor/internals/DefaultStateUpdater.java | 6 +++--- .../internals/DefaultStateUpdaterTest.java | 24 -- .../org/apache/kafka/test/StreamsTestUtils.java| 3 +-- 3 files changed, 17 insertions(+), 16 deletions(-)
[kafka] branch trunk updated (d479d129e0b -> 3d9a03cfe82)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from d479d129e0b KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078) add 3d9a03cfe82 MINOR: fix rerun-tests for unit test (#13288) No new revisions were added by this update. Summary of changes: build.gradle | 5 + 1 file changed, 5 insertions(+)
[kafka] branch trunk updated: KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f7f376f6c16 KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190) f7f376f6c16 is described below commit f7f376f6c162717e60e143b05fbd12ea2f347e3c Author: Philip Nee AuthorDate: Tue Feb 28 17:36:37 2023 -0800 KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190) In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll. Reviewers: Guozhang Wang --- .../consumer/internals/AbstractCoordinator.java| 12 +-- .../internals/AbstractCoordinatorTest.java | 39 ++ .../internals/ConsumerCoordinatorTest.java | 3 +- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 558ce2b1169..fc01b14ab08 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -501,13 +501,19 @@ public abstract class AbstractCoordinator implements Closeable { } if (exception instanceof UnknownMemberIdException || -exception instanceof IllegalGenerationException || -exception instanceof RebalanceInProgressException || -exception instanceof MemberIdRequiredException) +exception instanceof IllegalGenerationException || +exception instanceof RebalanceInProgressException || +exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; +// We need to return upon expired timer, in case if the client.poll returns immediately and the time +// has elapsed. +if (timer.isExpired()) { +return false; +} + timer.sleep(rebalanceConfig.retryBackoffMs); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 908dd7e4485..0a2b1e7ef8b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -1561,6 +1561,45 @@ public class AbstractCoordinatorTest { } } +@Test +public void testBackoffAndRetryUponRetriableError() { +this.mockTime = new MockTime(); +long currentTimeMs = System.currentTimeMillis(); +this.mockTime.setCurrentTimeMs(System.currentTimeMillis()); + +setupCoordinator(); // note: uses 100ms backoff +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(mockTime.timer(0)); + +// Retriable Exception + mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); +mockClient.prepareResponse(joinGroupResponse(Errors.NONE)); // Retry w/o error +mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); +coordinator.joinGroupIfNeeded(mockTime.timer(REQUEST_TIMEOUT_MS)); + +assertEquals(100, mockTime.milliseconds() - currentTimeMs, 1); +} + +@Test +public void testReturnUponRetriableErrorAndExpiredTimer() throws InterruptedException { +setupCoordinator(); +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(mockTime.timer(0)); +ExecutorService executor = Executors.newFixedThreadPool(1); +Timer t = mockTime.timer(500); +try { +Future attempt = executor.submit(() -> coordinator.joinGroupIfNeeded(t)); +mockTime.sleep(500); + mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); +assertFalse(attempt.get()); +} catch (Exception e) { +fail(); +} finally { +executor.shutdownNow(); +executor.awaitTermination(1000, TimeUnit.MILLISECONDS); +} +} + private AtomicBoolean prepareFirstHeartb
[kafka] branch trunk updated: MINOR: update docs of 'replica.socket.receive.buffer.bytes' (#13308)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 30795674615 MINOR: update docs of 'replica.socket.receive.buffer.bytes' (#13308) 30795674615 is described below commit 30795674615180af43377c79d106c559102e2522 Author: Chia-Ping Tsai AuthorDate: Tue Feb 28 03:29:44 2023 +0800 MINOR: update docs of 'replica.socket.receive.buffer.bytes' (#13308) Reviewers: Guozhang Wang --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1130cb5a12a..6f1aa52e8fa 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -888,7 +888,7 @@ object KafkaConfig { val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" - val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" + val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests to the leader for replicating data" val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, " + "if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned " + "to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via " +
[kafka] branch trunk updated (8d32a0f2463 -> 62431dca700)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8d32a0f2463 [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error (#13206) add 62431dca700 KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests (#13021) No new revisions were added by this update. Summary of changes: .../consumer/internals/CommitRequestManager.java | 238 + .../internals/DefaultBackgroundThread.java | 86 +--- .../clients/consumer/internals/GroupState.java | 89 .../consumer/internals/NetworkClientDelegate.java | 16 +- .../consumer/internals/PrototypeAsyncConsumer.java | 37 ++-- .../clients/consumer/internals/RequestManager.java | 4 + .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 47 +++- .../internals/events/CommitApplicationEvent.java | 64 ++ ...kgroundEvent.java => PollApplicationEvent.java} | 10 +- .../internals/CommitRequestManagerTest.java| 139 .../internals/DefaultBackgroundThreadTest.java | 41 +++- 12 files changed, 700 insertions(+), 73 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupState.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java copy clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{ErrorBackgroundEvent.java => PollApplicationEvent.java} (79%) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
[kafka] branch trunk updated: KAFKA-10199: Add task updater metrics, part 1 (#13228)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2fad1652942 KAFKA-10199: Add task updater metrics, part 1 (#13228) 2fad1652942 is described below commit 2fad1652942226454a44038f2350642817f9f74b Author: Guozhang Wang AuthorDate: Fri Feb 24 10:25:11 2023 -0800 KAFKA-10199: Add task updater metrics, part 1 (#13228) * Moved pausing-tasks logic out of the commit-interval loop to be on the top-level loop, similar to resuming tasks. * Added thread-level restoration metrics. * Related unit tests. Reviewers: Lucas Brutschy , Matthias J. Sax --- .../processor/internals/ChangelogReader.java | 10 +- .../processor/internals/DefaultStateUpdater.java | 230 +++-- .../processor/internals/StoreChangelogReader.java | 94 + .../streams/processor/internals/StreamThread.java | 5 +- .../streams/processor/internals/TaskManager.java | 4 +- .../internals/metrics/StreamsMetricsImpl.java | 5 + .../internals/DefaultStateUpdaterTest.java | 151 -- .../processor/internals/MockChangelogReader.java | 8 +- .../processor/internals/StreamThreadTest.java | 2 +- 9 files changed, 427 insertions(+), 82 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 03199d294ca..1cf8ef628da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -28,8 +28,10 @@ import java.util.Set; public interface ChangelogReader extends ChangelogRegister { /** * Restore all registered state stores by reading from their changelogs + * + * @return the total number of records restored in this call */ -void restore(final Map tasks); +long restore(final Map tasks); /** * Transit to restore active changelogs mode @@ -41,6 +43,12 @@ public interface ChangelogReader extends ChangelogRegister { */ void transitToUpdateStandby(); +/** + * @return true if the reader is in restoring active changelog mode; + * false if the reader is in updating standby changelog mode + */ +boolean isRestoringActive(); + /** * @return the changelog partitions that have been completed restoring */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index ae6618c304f..5e912c99a5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -16,8 +16,15 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -32,7 +39,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,6 +60,10 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; + public class DefaultStateUpdater implements StateUpdater { private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + @@ -59,14 +72,20 @@ public class DefaultStateUpdater implements StateUpdater { private class StateUpdaterThread extends Thread { private final ChangelogReader changelogReader; +private final StateUpdaterMetrics updaterMetrics; private final AtomicBoolean isRun
[kafka] branch trunk updated: KAFKA-14299: Fix pause and resume with state updater (#13025)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0fc029c6a47 KAFKA-14299: Fix pause and resume with state updater (#13025) 0fc029c6a47 is described below commit 0fc029c6a47a7a930a2b078569de1173cdb547a4 Author: Lucas Brutschy AuthorDate: Tue Feb 21 19:17:09 2023 +0100 KAFKA-14299: Fix pause and resume with state updater (#13025) * Fixes required to make the PauseResumeIntegrationTest pass. It was not enabled and it does not pass for the state updater code path. * Make sure no progress is made on paused topologies. The state updater restored one round of polls from the restore consumer before realizing that a newly added task was already in paused state when being added. * Wake up state updater when tasks are being resumed. If a task is resumed, it may be necessary to wake up the state updater from waiting on the tasksAndActions condition. * Make sure that allTasks methods also return the tasks that are currently being restored. * Enable PauseResumeIntegrationTest and upgrade it to JUnit5. Reviewers: Bruno Cadonna , Guozhang Wang --- .../org/apache/kafka/streams/KafkaStreams.java | 1 + .../processor/internals/DefaultStateUpdater.java | 65 ++ .../streams/processor/internals/ReadOnlyTask.java | 2 +- .../streams/processor/internals/StateUpdater.java | 5 ++ .../streams/processor/internals/StreamThread.java | 7 +- .../streams/processor/internals/TaskManager.java | 35 +++- .../KafkaStreamsNamedTopologyWrapper.java | 2 + .../integration/PauseResumeIntegrationTest.java| 100 - .../internals/DefaultStateUpdaterTest.java | 1 + .../processor/internals/ReadOnlyTaskTest.java | 1 + .../processor/internals/StreamThreadTest.java | 4 +- .../processor/internals/TaskManagerTest.java | 32 +++ 12 files changed, 189 insertions(+), 66 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 03b778ab6ec..c05e4c6c1ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1751,6 +1751,7 @@ public class KafkaStreams implements AutoCloseable { } else { topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY); } +threads.forEach(StreamThread::signalResume); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index d96593c5011..ae6618c304f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -116,6 +116,7 @@ public class DefaultStateUpdater implements StateUpdater { private void runOnce() throws InterruptedException { performActionsOnTasks(); +resumeTasks(); restoreTasks(); checkAllUpdatingTaskStates(time.milliseconds()); waitIfAllChangelogsCompletelyRead(); @@ -140,6 +141,16 @@ public class DefaultStateUpdater implements StateUpdater { } } +private void resumeTasks() { +if (isTopologyResumed.compareAndSet(true, false)) { +for (final Task task : pausedTasks.values()) { +if (!topologyMetadata.isPaused(task.id().topologyName())) { +resumeTask(task); +} +} +} +} + private void restoreTasks() { try { changelogReader.restore(updatingTasks); @@ -229,7 +240,7 @@ public class DefaultStateUpdater implements StateUpdater { if (isRunning.get() && changelogReader.allChangelogsCompleted()) { tasksAndActionsLock.lock(); try { -while (tasksAndActions.isEmpty()) { +while (tasksAndActions.isEmpty() && !isTopologyResumed.get()) { tasksAndActionsCondition.await(); } } finally { @@ -258,21 +269,39 @@ public class DefaultStateUpdater implements StateUpdater { } private void addTask(final Task task) { +final TaskId taskId = task.id(); + +Task existingTask = pausedTasks.get(taskId); +if (existingTask != null) { +throw new IllegalStateException( +(existingTask.isActive() ? "Active" : "
[kafka] branch trunk updated (38e43116222 -> 7e149990bdd)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 38e43116222 MINOR: Fix PluginInfoTest for Connect (#13266) add 7e149990bdd KAFKA-14717 KafkaStreams can' get running if the rebalance happens be… (#13248) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreams.java | 6 +++- .../streams/processor/internals/StreamThread.java | 4 +++ .../apache/kafka/streams/KafkaStreamsWrapper.java | 6 .../integration/AdjustStreamThreadCountTest.java | 34 ++ 4 files changed, 49 insertions(+), 1 deletion(-)
[kafka] branch trunk updated: KAFKA-14253 - More informative logging (#13253)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 82d5720aae7 KAFKA-14253 - More informative logging (#13253) 82d5720aae7 is described below commit 82d5720aae78c9e17606c8345dfc208557f9a8f2 Author: Philip Nee AuthorDate: Thu Feb 16 16:54:50 2023 -0800 KAFKA-14253 - More informative logging (#13253) Includes 2 requirements from the ticket: * Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as") * Sort the member ids (to help compare the membership and assignment across rebalances) Reviewers: Guozhang Wang --- .../internals/StreamsPartitionAssignor.java| 22 ++ 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 1875f57b649..46c1e41e6c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; - +import static java.util.Map.Entry.comparingByKey; import static java.util.UUID.randomUUID; - import static org.apache.kafka.common.utils.Utils.filterMap; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture; @@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final boolean lagComputationSuccessful = populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics); -log.info("All members participating in this rebalance: \n{}.", - clientStates.entrySet().stream() - .map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) - .collect(Collectors.joining(Utils.NL))); +log.info("{} members participating in this rebalance: \n{}.", +clientStates.size(), +clientStates.entrySet().stream() +.sorted(comparingByKey()) +.map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) +.collect(Collectors.joining(Utils.NL))); final Set allTasks = partitionsForTask.keySet(); statefulTasks.addAll(changelogTopics.statefulTaskIds()); @@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf statefulTasks, assignmentConfigs); -log.info("Assigned tasks {} including stateful {} to clients as: \n{}.", -allTasks, statefulTasks, clientStates.entrySet().stream() +log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.", +allTasks.size(), +allTasks, +statefulTasks, +clientStates.size(), +clientStates.entrySet().stream() +.sorted(comparingByKey()) .map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment()) .collect(Collectors.joining(Utils.NL)));
[kafka] branch trunk updated: KAFKA-14650: Synchronize access to tasks inside task manager (#13167)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 083e11a22ca KAFKA-14650: Synchronize access to tasks inside task manager (#13167) 083e11a22ca is described below commit 083e11a22ca9966ed010acdd5705351ab4300b52 Author: Guozhang Wang AuthorDate: Thu Feb 9 10:33:19 2023 -0800 KAFKA-14650: Synchronize access to tasks inside task manager (#13167) 1. The major fix: synchronize access to tasks inside task manager, this is a fix of a regression introduced in #12397 2. Clarify on func names of StreamThread that maybe triggered outside the StreamThread. 3. Minor cleanups. Reviewers: Lucas Brutschy --- .../org/apache/kafka/streams/KafkaStreams.java | 16 ++--- .../streams/processor/internals/StreamThread.java | 24 .../streams/processor/internals/TaskManager.java | 6 + .../kafka/streams/processor/internals/Tasks.java | 22 +- .../KafkaStreamsNamedTopologyWrapper.java | 2 +- .../internals/StreamThreadStateStoreProvider.java | 2 +- .../org/apache/kafka/streams/KafkaStreamsTest.java | 5 ++--- .../processor/internals/StreamThreadTest.java | 26 +++--- .../StreamThreadStateStoreProviderTest.java| 4 ++-- 9 files changed, 59 insertions(+), 48 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bc3a1243ce2..ee9f57b0680 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1826,7 +1826,7 @@ public class KafkaStreams implements AutoCloseable { */ public Map> allLocalStorePartitionLags() { final List allTasks = new ArrayList<>(); -processStreamThread(thread -> allTasks.addAll(thread.allTasks().values())); +processStreamThread(thread -> allTasks.addAll(thread.readyOnlyAllTasks())); return allLocalStorePartitionLags(allTasks); } @@ -1917,21 +1917,19 @@ public class KafkaStreams implements AutoCloseable { ); } else { for (final StreamThread thread : threads) { -final Map tasks = thread.allTasks(); -for (final Entry entry : tasks.entrySet()) { +final Set tasks = thread.readyOnlyAllTasks(); +for (final Task task : tasks) { -final TaskId taskId = entry.getKey(); +final TaskId taskId = task.id(); final int partition = taskId.partition(); -if (request.isAllPartitions() -|| request.getPartitions().contains(partition)) { -final Task task = entry.getValue(); +if (request.isAllPartitions() || request.getPartitions().contains(partition)) { final StateStore store = task.getStore(storeName); if (store != null) { final StreamThread.State state = thread.state(); final boolean active = task.isActive(); if (request.isRequireActive() -&& (state != StreamThread.State.RUNNING -|| !active)) { +&& (state != StreamThread.State.RUNNING || !active)) { + result.addResult( partition, QueryResult.forFailure( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 2d433eb3c82..d4be6a83af9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -60,7 +60,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -1269,16 +1268,23 @@ public class StreamThread extends Thread { ); } -public Map activeTaskMap() { -return taskManager.activeTaskMap(); -} - -public List activeTasks() { -return taskManager.activeTaskIterable(); +/** + * Getting the list of current active tasks of the thread; + * Note that the returned list may be used by other thread than the StreamThread itself, + * and hence need to be read-only + */ +public Set readOnlyActiveTasks() { +
[kafka] branch trunk updated: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 788793dee6f KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179) 788793dee6f is described below commit 788793dee6fa5d7ba5cb7d756b72c7d043dc8789 Author: Guozhang Wang AuthorDate: Tue Feb 7 11:33:09 2023 -0800 KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179) 1. Add the new API (default impl is empty) to StateRestoreListener. 2. Update related unit tests Reviewers: Lucas Brutschy , Matthias J. Sax --- .../streams/processor/StateRestoreListener.java| 22 .../processor/internals/StoreChangelogReader.java | 15 +++ .../internals/StoreChangelogReaderTest.java| 133 +++-- .../kafka/test/MockStateRestoreListener.java | 10 ++ 4 files changed, 173 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java index 6ba794f187b..006cc58cd43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java @@ -22,13 +22,16 @@ import org.apache.kafka.common.TopicPartition; /** * Class for listening to various states of the restoration process of a StateStore. * + * * When calling {@link org.apache.kafka.streams.KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener)} * the passed instance is expected to be stateless since the {@code StateRestoreListener} is shared * across all {@link org.apache.kafka.streams.processor.internals.StreamThread} instances. * + * * Users desiring stateful operations will need to provide synchronization internally in * the {@code StateRestorerListener} implementation. * + * * Note that this listener is only registered at the per-client level and users can base on the {@code storeName} * parameter to define specific monitoring for different {@link StateStore}s. There is another * {@link StateRestoreCallback} interface which is registered via the @@ -37,6 +40,12 @@ import org.apache.kafka.common.TopicPartition; * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that the update process of standby tasks is not monitored via this interface, since a standby task does + * note actually restore state, but keeps updating its state from the changelogs written by the active task + * which does not ever finish. + * + * * Incremental updates are exposed so users can estimate how much progress has been made. */ public interface StateRestoreListener { @@ -85,4 +94,17 @@ public interface StateRestoreListener { final String storeName, final long totalRestored); +/** + * Method called when restoring the {@link StateStore} is suspended due to the task being migrated out of the host. + * If the migrated task is recycled or re-assigned back to the current host, another + * {@link #onRestoreStart(TopicPartition, String, long, long)} would be called. + * + * @param topicPartition the {@link TopicPartition} containing the values to restore + * @param storeName the name of the store just restored + * @param totalRestored the total number of records restored for this TopicPartition before being paused + */ +default void onRestoreSuspended(final TopicPartition topicPartition, +final String storeName, +final long totalRestored) { +} } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 874f1993c19..be580f3575c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -986,8 +986,23 @@ public class StoreChangelogReader implements ChangelogReader { for (final TopicPartition partition : revokedChangelogs) { final ChangelogMetadata changelogMetadata = changelogs.remove(partition); if (changelogMetadata != null) { +// if the changelog is still in REGISTERED, it means it has not initialized and started +// restoring yet, and hence we should not try to remove the changelog partition if (!changelogMetadata.state().equals(ChangelogState.REGISTERED
[kafka] branch trunk updated (a3cf8b54e03 -> 3cf13064cc9)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from a3cf8b54e03 KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984) add 3cf13064cc9 Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest (#12777) No new revisions were added by this update. Summary of changes: .../internals/TimeOrderedWindowStoreTest.java | 102 ++--- 1 file changed, 46 insertions(+), 56 deletions(-)
[kafka] branch trunk updated (6c98544a964 -> eb7f490159c)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 6c98544a964 KAFKA-14491: [2/N] Refactor RocksDB store open iterator management (#13142) add eb7f490159c chore: Fix scaladoc warnings (#13164) No new revisions were added by this update. Summary of changes: build.gradle | 4 +++ .../kafka/streams/scala/kstream/Branched.scala | 4 +-- .../streams/scala/kstream/BranchedKStream.scala| 12 + .../kafka/streams/scala/kstream/Consumed.scala | 6 ++--- .../kafka/streams/scala/kstream/Grouped.scala | 8 +++--- .../kafka/streams/scala/kstream/Joined.scala | 8 +++--- .../kafka/streams/scala/kstream/KStream.scala | 14 +- .../kafka/streams/scala/kstream/KTable.scala | 10 +--- .../kafka/streams/scala/kstream/Materialized.scala | 30 -- .../kafka/streams/scala/kstream/Produced.scala | 6 ++--- .../streams/scala/kstream/Repartitioned.scala | 6 ++--- .../kafka/streams/scala/kstream/StreamJoined.scala | 12 - 12 files changed, 66 insertions(+), 54 deletions(-)
[kafka] branch trunk updated (bc1ce9f0f1b -> 1d0585563b4)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from bc1ce9f0f1b KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging (#13119) add 1d0585563b4 MINOR: fix flaky DefaultStateUpdaterTest (#13160) No new revisions were added by this update. Summary of changes: streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[kafka] branch trunk updated (123e0e9ca9e -> ca80502ebe9)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 123e0e9ca9e MINOR: fix warnings in Streams javadocs (#13132) add ca80502ebe9 Update outdated documentation. (#13139) No new revisions were added by this update. Summary of changes: .../clients/consumer/internals/SubscriptionState.java | 17 - 1 file changed, 8 insertions(+), 9 deletions(-)
[kafka] branch trunk updated: Kafka Streams Threading P3: TaskManager Impl (#12754)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 55a3a95b7a1 Kafka Streams Threading P3: TaskManager Impl (#12754) 55a3a95b7a1 is described below commit 55a3a95b7a1f61c2a93a6341d3e091f68b048234 Author: Guozhang Wang AuthorDate: Fri Oct 14 16:10:57 2022 -0700 Kafka Streams Threading P3: TaskManager Impl (#12754) 0. Add name to task executors. 1. DefaultTaskManager implementation, for interacting with the TaskExecutors and support add/remove/lock APIs. 2. Related unit tests. --- .../internals/tasks/DefaultTaskExecutor.java | 13 +- .../internals/tasks/DefaultTaskManager.java| 246 + .../processor/internals/tasks/TaskExecutor.java| 7 +- .../internals/tasks/TaskExecutorCreator.java | 24 ++ .../internals/tasks/DefaultTaskExecutorTest.java | 2 +- .../internals/tasks/DefaultTaskManagerTest.java| 189 6 files changed, 476 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index 5ad0950fc51..c03384f4daa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.internals.DefaultStateUpdater; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; import org.slf4j.Logger; @@ -45,7 +44,7 @@ public class DefaultTaskExecutor implements TaskExecutor { super(name); final String logPrefix = String.format("%s ", name); final LogContext logContext = new LogContext(logPrefix); -log = logContext.logger(DefaultStateUpdater.class); +log = logContext.logger(DefaultTaskExecutor.class); } @Override @@ -102,6 +101,7 @@ public class DefaultTaskExecutor implements TaskExecutor { } private final Time time; +private final String name; private final TaskManager taskManager; private StreamTask currentTask = null; @@ -109,15 +109,22 @@ public class DefaultTaskExecutor implements TaskExecutor { private CountDownLatch shutdownGate; public DefaultTaskExecutor(final TaskManager taskManager, + final String name, final Time time) { this.time = time; +this.name = name; this.taskManager = taskManager; } +@Override +public String name() { +return name; +} + @Override public void start() { if (taskExecutorThread == null) { -taskExecutorThread = new TaskExecutorThread("task-executor"); +taskExecutorThread = new TaskExecutorThread(name); taskExecutorThread.start(); shutdownGate = new CountDownLatch(1); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java new file mode 100644 index 000..3f97de85ceb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.tasks; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.LogContext; +i
[kafka] branch trunk updated (78b4ba7d85a -> dfb59296654)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 78b4ba7d85a MINOR; Add kraft controller log level in log4j prop (#12707) add dfb59296654 Kafka Streams Threading P2: Skeleton TaskExecutor Impl (#12744) No new revisions were added by this update. Summary of changes: .../internals/tasks/DefaultTaskExecutor.java | 158 + .../processor/internals/tasks/TaskExecutor.java| 15 +- .../processor/internals/tasks/TaskManager.java | 5 +- .../internals/tasks/DefaultTaskExecutorTest.java | 107 ++ 4 files changed, 281 insertions(+), 4 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
[kafka] branch trunk updated: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor (#12737)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 69059b5f288 Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor (#12737) 69059b5f288 is described below commit 69059b5f288ef5b6496953ba1d77b2570d8b3142 Author: Guozhang Wang AuthorDate: Wed Oct 12 16:33:13 2022 -0700 Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor (#12737) The interfaces (and their future impls) are added under the processor/internals/tasks package, to distinguish with the existing old classes: 1. TaskExecutor is the interface for a processor thread. It takes at most one task to process at a given time from the task manager. When being asked from the task manager to un-assign the current processing task, it will stop processing and give the task back to task manager. 2. TaskManager schedules all the active tasks to assign to TaskExecutors. Specifically: 1) when a task executor ask it for an unassigned task to process (assignNextTask), it will return the available task based on its scheduling algorithm. 2) when the task manager decides to commit (all) tasks, or when a rebalance event requires it to modify the maintained active tasks (via onAssignment), it will lock all the tasks that are going to be closed / committed, asking the TaskExecutor to gi [...] Reviewers: John Roesler , Anna Sophie Blee-Goldman --- .../processor/internals/tasks/TaskExecutor.java| 57 .../processor/internals/tasks/TaskManager.java | 100 + 2 files changed, 157 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java new file mode 100644 index 000..ead1fb8179e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.tasks; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.streams.processor.internals.ReadOnlyTask; +import org.apache.kafka.streams.processor.internals.StreamTask; + +import java.time.Duration; + +public interface TaskExecutor { + +/** + * Starts the task processor. + */ +void start(); + +/** + * Shuts down the task processor updater. + * + * @param timeout duration how long to wait until the state updater is shut down + * + * @throws + * org.apache.kafka.streams.errors.StreamsException if the state updater thread cannot shutdown within the timeout + */ +void shutdown(final Duration timeout); + +/** + * Get the current assigned processing task. The task returned is read-only and cannot be modified. + * + * @return the current processing task + */ +ReadOnlyTask currentTask(); + +/** + * Unassign the current processing task from the task processor and give it back to the state manager. + * + * The paused task must be flushed since it may be committed or closed by the task manager next. + * + * This method does not block, instead a future is returned. + */ +KafkaFuture unassign(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java new file mode 100644 index 000..e9929714aca --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "L
[kafka] branch trunk updated: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not roundtrip input with ByteBufferDeserializer#deserialize(String, byte[]) (#12704)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 496ae054c2d Fix ByteBufferSerializer#serialize(String, ByteBuffer) not roundtrip input with ByteBufferDeserializer#deserialize(String, byte[]) (#12704) 496ae054c2d is described below commit 496ae054c2d43c0905167745bfb2f4a0725e9fc2 Author: LinShunKang AuthorDate: Fri Sep 30 21:45:18 2022 +0800 Fix ByteBufferSerializer#serialize(String, ByteBuffer) not roundtrip input with ByteBufferDeserializer#deserialize(String, byte[]) (#12704) Reviewers: Guozhang Wang --- .../apache/kafka/common/serialization/ByteBufferSerializer.java | 8 +++- .../org/apache/kafka/common/serialization/SerializationTest.java | 4 +++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index 5987688759e..06b66a62cb0 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -21,8 +21,7 @@ import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; /** - * ByteBufferSerializer will not change ByteBuffer's mark, position and limit. - * And do not need to flip before call serialize(String, ByteBuffer). For example: + * Do not need to flip before call serialize(String, ByteBuffer). For example: * * * @@ -48,8 +47,7 @@ public class ByteBufferSerializer implements Serializer { } } -final ByteBuffer copyData = data.asReadOnlyBuffer(); -copyData.flip(); -return Utils.toArray(copyData); +data.flip(); +return Utils.toArray(data); } } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index eb1fee3943f..a0b67a03d42 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -50,7 +50,9 @@ public class SerializationTest { put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f)); put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d)); put(byte[].class, Arrays.asList("my string".getBytes())); -put(ByteBuffer.class, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(; +put(ByteBuffer.class, Arrays.asList(ByteBuffer.wrap("my string".getBytes()), +ByteBuffer.allocate(10).put("my string".getBytes()), +ByteBuffer.allocateDirect(10).put("my string".getBytes(; put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes(; put(UUID.class, Arrays.asList(UUID.randomUUID())); }
[kafka] branch trunk updated: KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 51dbd175b08 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683) 51dbd175b08 is described below commit 51dbd175b08e78aeca03d6752847aa5f23c98659 Author: LinShunKang AuthorDate: Fri Sep 30 01:59:47 2022 +0800 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683) Reviewers: Guozhang Wang --- .../common/serialization/ByteBufferSerializer.java | 31 -- .../common/serialization/SerializationTest.java| 19 + 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index 9fb12544e0f..5987688759e 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -16,25 +16,40 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.utils.Utils; + import java.nio.ByteBuffer; +/** + * ByteBufferSerializer will not change ByteBuffer's mark, position and limit. + * And do not need to flip before call serialize(String, ByteBuffer). For example: + * + * + * + * ByteBufferSerializer serializer = ...; // Create Serializer + * ByteBuffer buffer = ...; // Allocate ByteBuffer + * buffer.put(data); // Put data into buffer, do not need to flip + * serializer.serialize(topic, buffer); // Serialize buffer + * + * + */ public class ByteBufferSerializer implements Serializer { + +@Override public byte[] serialize(String topic, ByteBuffer data) { -if (data == null) +if (data == null) { return null; - -data.rewind(); +} if (data.hasArray()) { -byte[] arr = data.array(); +final byte[] arr = data.array(); if (data.arrayOffset() == 0 && arr.length == data.remaining()) { return arr; } } -byte[] ret = new byte[data.remaining()]; -data.get(ret, 0, ret.length); -data.rewind(); -return ret; +final ByteBuffer copyData = data.asReadOnlyBuffer(); +copyData.flip(); +return Utils.toArray(copyData); } } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 85c09dd17ae..eb1fee3943f 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -31,6 +31,8 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.Stack; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -368,4 +370,21 @@ public class SerializationTest { return Serdes.serdeFrom(serializer, deserializer); } + +@Test +public void testByteBufferSerializer() { +final byte[] bytes = "Hello".getBytes(UTF_8); +final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes); +final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes); +final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes); +final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes); +final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes); +try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) { +assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0)); +assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1)); +assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2)); +assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0)); +assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1)); +} +} }
[kafka] branch trunk updated (d2f900b055d -> d62a42df2e2)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from d2f900b055d MINOR: Small update docs/design.html grammar and typo (#12691) add d62a42df2e2 KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater (#12659) No new revisions were added by this update. Summary of changes: .../processor/internals/DefaultStateUpdater.java | 109 - .../streams/processor/internals/StateUpdater.java | 42 ++-- .../streams/processor/internals/StreamThread.java | 5 +- .../streams/processor/internals/TaskAndAction.java | 16 +-- .../internals/DefaultStateUpdaterTest.java | 75 +++--- .../processor/internals/TaskAndActionTest.java | 40 .../org/apache/kafka/test/StreamsTestUtils.java| 19 7 files changed, 110 insertions(+), 196 deletions(-)
[kafka] branch trunk updated (7496e624341 -> b0ace180359)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 7496e624341 KAFKA-14259: BrokerRegistration#toString throws an exception, terminating metadata replay (#12681) add b0ace180359 KAFKA-14239: Merge StateRestorationIntegrationTest into RestoreIntegrationTest (#12670) No new revisions were added by this update. Summary of changes: .../integration/RestoreIntegrationTest.java| 106 +--- .../StateRestorationIntegrationTest.java | 138 - 2 files changed, 88 insertions(+), 156 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
[kafka] branch trunk updated (7ec10ce19a -> 8380d2edf4)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 7ec10ce19a HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585) add 8380d2edf4 KAFKA-10199: Handle exceptions from state updater (#12519) No new revisions were added by this update. Summary of changes: .../processor/internals/DefaultStateUpdater.java | 10 ++ .../streams/processor/internals/TaskManager.java | 191 + .../kafka/streams/processor/internals/Tasks.java | 53 ++ .../streams/processor/internals/TasksRegistry.java | 6 +- .../processor/internals/TaskManagerTest.java | 95 +- 5 files changed, 232 insertions(+), 123 deletions(-)
[kafka] branch trunk updated (8d665c42e23 -> 63e6fdc9c42)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8d665c42e23 MINOR: Small cleanups in integration.kafka tests (#12480) add 63e6fdc9c42 MINOR: Improve javadocs for offset retention (#12552) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala | 8 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 7 +-- 2 files changed, 11 insertions(+), 4 deletions(-)
[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revoked and lost partitions (#12547)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new add4ca6c7f KAFKA-10199: Remove tasks from state updater on revoked and lost partitions (#12547) add4ca6c7f is described below commit add4ca6c7f1b289477aa7d6e918f5d22b78088fe Author: Bruno Cadonna AuthorDate: Mon Aug 22 20:50:50 2022 +0200 KAFKA-10199: Remove tasks from state updater on revoked and lost partitions (#12547) Removes tasks from the state updater when the input partitions of the tasks are revoked or partitions are lost during a rebalance. Reviewers: Guozhang Wang --- .../streams/processor/internals/TaskManager.java | 42 +++- .../kafka/streams/processor/internals/Tasks.java | 19 +++- .../processor/internals/TaskManagerTest.java | 116 - 3 files changed, 165 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 03c36b0daf..478b783d68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -465,7 +465,7 @@ public class TaskManager { standbyTasksToCreate.remove(taskId); } else { stateUpdater.remove(taskId); -tasks.addPendingTaskToClose(taskId); +tasks.addPendingTaskToCloseClean(taskId); } } } @@ -692,7 +692,7 @@ public class TaskManager { taskExceptions.putIfAbsent(taskId, e); } -} else if (tasks.removePendingTaskToClose(task.id())) { +} else if (tasks.removePendingTaskToCloseClean(task.id())) { try { task.suspend(); task.closeClean(); @@ -710,6 +710,8 @@ public class TaskManager { taskExceptions.putIfAbsent(task.id(), e); } +} else if (tasks.removePendingTaskToCloseDirty(task.id())) { +tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -755,6 +757,8 @@ public class TaskManager { } } +removeRevokedTasksFromStateUpdater(remainingRevokedPartitions); + if (!remainingRevokedPartitions.isEmpty()) { log.debug("The following revoked partitions {} are missing from the current task partitions. It could " + "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " + @@ -840,6 +844,20 @@ public class TaskManager { } } +private void removeRevokedTasksFromStateUpdater(final Set remainingRevokedPartitions) { +if (stateUpdater != null) { +for (final Task restoringTask : stateUpdater.getTasks()) { +if (restoringTask.isActive()) { +if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { +tasks.addPendingTaskToCloseClean(restoringTask.id()); +stateUpdater.remove(restoringTask.id()); + remainingRevokedPartitions.removeAll(restoringTask.inputPartitions()); +} +} +} +} +} + private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, final Map> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { @@ -867,6 +885,15 @@ public class TaskManager { void handleLostAll() { log.debug("Closing lost active tasks as zombies."); +closeRunningTasksDirty(); +removeLostTasksFromStateUpdater(); + +if (processingMode == EXACTLY_ONCE_V2) { +activeTaskCreator.reInitializeThreadProducer(); +} +} + +private void closeRunningTasksDirty() { final Set allTask = tasks.allTasks(); for (final Task task : allTask) { // Even though we've apparently dropped out of the group, we can continue safely to maintain our @@ -875,9 +902,16 @@ public class TaskManager { closeTaskDirty(task); } } +} -if (processingMode == EXACTLY_ONCE_V2) { -activeTaskCreator.reInitializeThreadProducer(); +private void removeLostTasksFromStateUpdater() { +if (stateUpdater != nul
[kafka] branch trunk updated: MINOR: Improve KafkaProducer Javadocs (#12537)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5d32f24cc3 MINOR: Improve KafkaProducer Javadocs (#12537) 5d32f24cc3 is described below commit 5d32f24cc3597760d3b846647e6a19dddc6e3d71 Author: Guozhang Wang AuthorDate: Fri Aug 19 10:09:48 2022 -0700 MINOR: Improve KafkaProducer Javadocs (#12537) While reviewing KIP-588 and KIP-691 I went through the exception throwing behavior and wanted to improve the related javadocs a little bit. Reviewers: John Roesler --- .../kafka/clients/producer/KafkaProducer.java | 22 +++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 2d5c8994b4..ec8b8725c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -675,6 +675,11 @@ public class KafkaProducer implements Producer { * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits). * + * + * This method is a blocking call that waits until the request has been received and acknowledged by the consumer group + * coordinator; but the offsets are not considered as committed until the transaction itself is successfully committed later (via + * the {@link #commitTransaction()} call). + * * @throws IllegalStateException if no transactional.id has been configured, no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker @@ -685,6 +690,7 @@ public class KafkaProducer implements Producer { * transactional.id is not authorized, or the consumer group id is not authorized. * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch * to the partition leader. See the exception for more details + * @throws TimeoutException if the time taken for sending the offsets has surpassed max.block.ms. * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error * @@ -711,6 +717,11 @@ public class KafkaProducer implements Producer { * requires the brokers to be on version 2.5 or newer to understand. * * + * This method is a blocking call that waits until the request has been received and acknowledged by the consumer group + * coordinator; but the offsets are not considered as committed until the transaction itself is successfully committed later (via + * the {@link #commitTransaction()} call). + * + * * Note, that the consumer should have {@code enable.auto.commit=false} and should * also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits). @@ -735,7 +746,7 @@ public class KafkaProducer implements Producer { * to the partition leader. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error - * @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms. + * @throws TimeoutException if the time taken for sending the offsets has surpassed max.block.ms. * @throws InterruptException if the thread is interrupted while blocked */ public void sendOffsetsToTransaction(Map offsets, @@ -765,7 +776,10 @@ public class KafkaProducer implements Producer { * Note that exceptions thrown by callbacks are ignored; the producer proceeds to commit the transaction in any case. * * Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration - * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. + * of {@code max.block.ms}, but this does not mean the request did not actually reach the broker. In fact, it only indicates + * that we cannot get the acknowledgement response in time, so it's up to the application's logic + * to decide how to handle time outs. + * Additionally, it will raise {@link InterruptException} if interrupted. * It is safe to retry
[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revocation (#12520)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation (#12520) b47c4d8598 is described below commit b47c4d859805068de6a8fe8de3bda5e7a21132e2 Author: Bruno Cadonna AuthorDate: Wed Aug 17 20:13:34 2022 +0200 KAFKA-10199: Remove tasks from state updater on revocation (#12520) Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance. Reviewers: Guozhang Wang --- .../streams/processor/internals/TaskManager.java | 16 + .../processor/internals/TaskManagerTest.java | 81 ++ 2 files changed, 97 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4bba28a3f3..bab05a5184 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -757,6 +757,8 @@ public class TaskManager { } } +removeRevokedTasksFromStateUpdater(remainingRevokedPartitions); + if (!remainingRevokedPartitions.isEmpty()) { log.debug("The following revoked partitions {} are missing from the current task partitions. It could " + "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " + @@ -842,6 +844,20 @@ public class TaskManager { } } +private void removeRevokedTasksFromStateUpdater(final Set remainingRevokedPartitions) { +if (stateUpdater != null) { +for (final Task restoringTask : stateUpdater.getTasks()) { +if (restoringTask.isActive()) { +if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { +tasks.addPendingTaskToClose(restoringTask.id()); +stateUpdater.remove(restoringTask.id()); + remainingRevokedPartitions.removeAll(restoringTask.inputPartitions()); +} +} +} +} +} + private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, final Map> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 133541bfac..ff52ad5ae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -751,6 +751,87 @@ public class TaskManagerTest { assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); } +@Test +public void shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() { +final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId00Partitions).build(); +final TaskManager taskManager = setupForRevocation(mkSet(task), mkSet(task)); + +taskManager.handleRevocation(taskId00Partitions); + +Mockito.verify(stateUpdater).remove(task.id()); + +taskManager.tryToCompleteRestoration(time.milliseconds(), null); + +Mockito.verify(task).closeClean(); +} + +public void shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() { +final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId00Partitions).build(); +final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId01Partitions).build(); +final TaskManager taskManager = setupForRevocation(mkSet(task1, task2), mkSet(task1, task2)); + +taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions)); + +Mockito.verify(stateUpdater).remove(task1.id()); +Mockito.verify(stateUpdater).remove(task2.id()); + +taskManager.tryToCompleteRestoration(time.milliseconds(), null); + +Mockito.verify(task1).closeClean(); +Mockito.verify(task2).closeClean(); +} + +@Test +public void shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromS
[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on partition lost (#12521)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9f20f89953 KAFKA-10199: Remove tasks from state updater on partition lost (#12521) 9f20f89953 is described below commit 9f20f8995399d9e03f518f7b9c8be2bffb2fdcfc Author: Bruno Cadonna AuthorDate: Wed Aug 17 20:12:30 2022 +0200 KAFKA-10199: Remove tasks from state updater on partition lost (#12521) Removes tasks from the state updater when the input partitions of the tasks are lost during a rebalance. Reviewers: Guozhang Wang --- .../streams/processor/internals/TaskManager.java | 26 ++-- .../kafka/streams/processor/internals/Tasks.java | 19 ++--- .../processor/internals/TaskManagerTest.java | 48 -- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 03c36b0daf..4bba28a3f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -465,7 +465,7 @@ public class TaskManager { standbyTasksToCreate.remove(taskId); } else { stateUpdater.remove(taskId); -tasks.addPendingTaskToClose(taskId); +tasks.addPendingTaskToCloseClean(taskId); } } } @@ -692,7 +692,7 @@ public class TaskManager { taskExceptions.putIfAbsent(taskId, e); } -} else if (tasks.removePendingTaskToClose(task.id())) { +} else if (tasks.removePendingTaskToCloseClean(task.id())) { try { task.suspend(); task.closeClean(); @@ -710,6 +710,8 @@ public class TaskManager { taskExceptions.putIfAbsent(task.id(), e); } +} else if (tasks.removePendingTaskToCloseDirty(task.id())) { +tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -867,6 +869,15 @@ public class TaskManager { void handleLostAll() { log.debug("Closing lost active tasks as zombies."); +closeRunningTasksDirty(); +removeLostTasksFromStateUpdater(); + +if (processingMode == EXACTLY_ONCE_V2) { +activeTaskCreator.reInitializeThreadProducer(); +} +} + +private void closeRunningTasksDirty() { final Set allTask = tasks.allTasks(); for (final Task task : allTask) { // Even though we've apparently dropped out of the group, we can continue safely to maintain our @@ -875,9 +886,16 @@ public class TaskManager { closeTaskDirty(task); } } +} -if (processingMode == EXACTLY_ONCE_V2) { -activeTaskCreator.reInitializeThreadProducer(); +private void removeLostTasksFromStateUpdater() { +if (stateUpdater != null) { +for (final Task restoringTask : stateUpdater.getTasks()) { +if (restoringTask.isActive()) { +tasks.addPendingTaskToCloseDirty(restoringTask.id()); +stateUpdater.remove(restoringTask.id()); +} +} } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 9628b42d92..8178fe3691 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -55,7 +55,9 @@ class Tasks { private final Map> pendingTasksToRecycle = new HashMap<>(); private final Map> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set pendingTasksToInit = new HashSet<>(); -private final Set pendingTasksToClose = new HashSet<>(); +private final Set pendingTasksToCloseClean = new HashSet<>(); + +private final Set pendingTasksToCloseDirty = new HashSet<>(); // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map activeTasksPerPartition = new HashMap<>(); @@ -111,12 +113,19 @@ class Tasks { pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions); }
[kafka] branch trunk updated: KAFKA-10199: Handle task closure and recycling from state updater (#12466)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dc72f6ec02 KAFKA-10199: Handle task closure and recycling from state updater (#12466) dc72f6ec02 is described below commit dc72f6ec02c7a7fbda083cd8a5a9f0081c7e58fd Author: Guozhang Wang AuthorDate: Mon Aug 15 19:33:46 2022 -0700 KAFKA-10199: Handle task closure and recycling from state updater (#12466) 1. Within the tryCompleteRestore function of the thread, try to drain the removed tasks from state updater and handle accordingly: 1) for recycle, 2) for closure, 3) for update input partitions. 2. Catch up on some unit test coverage from previous PRs. 3. Some minor cleanups around exception handling. Reviewers: Bruno Cadonna --- .../processor/internals/ActiveTaskCreator.java | 1 + .../streams/processor/internals/StandbyTask.java | 2 +- .../processor/internals/StandbyTaskCreator.java| 1 + .../streams/processor/internals/StreamTask.java| 2 +- .../streams/processor/internals/TaskManager.java | 311 + .../kafka/streams/processor/internals/Tasks.java | 90 +++--- .../internals/ProcessorTopologyFactories.java | 1 - .../processor/internals/StandbyTaskTest.java | 5 +- .../processor/internals/StreamTaskTest.java| 12 +- .../processor/internals/TaskManagerTest.java | 178 +++- .../streams/processor/internals/TasksTest.java | 64 - 11 files changed, 471 insertions(+), 196 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 46455111db..d28d0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -135,6 +135,7 @@ class ActiveTaskCreator { return threadProducer; } +// TODO: convert to StreamTask when we remove TaskManager#StateMachineTask with mocks public Collection createTasks(final Consumer consumer, final Map> tasksToBeCreated) { final List createdTasks = new ArrayList<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index bb7aef1dcd..87f19c4b1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -234,7 +234,7 @@ public class StandbyTask extends AbstractTask implements Task { closeTaskSensor.record(); transitionTo(State.CLOSED); -log.info("Closed and recycled state, and converted type to active"); +log.info("Closed and recycled state"); } private void close(final boolean clean) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 2f48cdb67f..26a3a49af3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -67,6 +67,7 @@ class StandbyTaskCreator { ); } +// TODO: convert to StandbyTask when we remove TaskManager#StateMachineTask with mocks Collection createTasks(final Map> tasksToBeCreated) { final List createdTasks = new ArrayList<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index f7bf8a5e74..8a30bcf6ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -568,7 +568,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, closeTaskSensor.record(); transitionTo(State.CLOSED); -log.info("Closed and recycled state, and converted type to standby"); +log.info("Closed and recycled state"); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index cfd20d2299..03c36b0daf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/i
[kafka-site] branch asf-site updated: Update my entry to include PMC membership (#434)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new ac411462 Update my entry to include PMC membership (#434) ac411462 is described below commit ac411462d13c9f5cc150aa47888a1af7aabb59d3 Author: A. Sophie Blee-Goldman AuthorDate: Tue Aug 9 09:36:59 2022 -0700 Update my entry to include PMC membership (#434) Reviewers: David Jacot , Guozhang Wang --- committers.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/committers.html b/committers.html index ce95457a..1855ed80 100644 --- a/committers.html +++ b/committers.html @@ -341,7 +341,7 @@ Anna Sophie Blee-Goldman - Committer + Committer, and PMC member https://www.linkedin.com/in/ableegoldman/;>/in/ableegoldman
[kafka] branch trunk updated: HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (#12476)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5ceaa588ee HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (#12476) 5ceaa588ee is described below commit 5ceaa588ee224fe1a859e4212636aba22b22f540 Author: Guozhang Wang AuthorDate: Wed Aug 3 15:36:59 2022 -0700 HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (#12476) While working on KAFKA-13877, I feel it's an overkill to introduce the whole test class as an integration test, since all we need is to just test the assignor itself which could be a unit test. Running this suite with 9+ instances takes long time and is still vulnerable to all kinds of timing based flakiness. A better choice is to reduce it as a unit test, similar to HighAvailabilityStreamsPartitionAssignorTest that just test the behavior of the assignor itself, rather than creating m [...] Since we mock everything, there's no flakiness anymore. Plus we greatly reduced the test runtime (on my local machine, the old integration takes about 35 secs to run the whole suite, while the new one take 20ms on average). Reviewers: Divij Vaidya , Dalibor Plavcic --- .../integration/RackAwarenessIntegrationTest.java | 433 .../RackAwarenessStreamsPartitionAssignorTest.java | 576 + .../processor/internals/StreamTaskTest.java| 4 +- .../internals/assignment/AssignmentTestUtils.java | 3 + 4 files changed, 581 insertions(+), 435 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java deleted file mode 100644 index 7c93b769f5..00 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.ThreadMetadata; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Repartitioned; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.Timeout; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.jupiter.api.Assertions.assertEquals; - -@Timeout(600) -@Tag("integration") -public class RackAwarenessIntegrationTest { -private static final int NUM_BROKERS = 1; - -private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - -private static final String
[kafka] branch trunk updated: KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3202459394 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468) 3202459394 is described below commit 32024593947f8bb497f2f5d392e0d1d892a16ff3 Author: Guozhang Wang AuthorDate: Wed Aug 3 09:17:38 2022 -0700 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468) In the current test, we check for tag distribution immediately after everyone is on the running state, however due to the fact of the follow-up rebalances, "everyone is now in running state" does not mean that the cluster is now stable. In fact, a follow-up rebalance may occur, upon which the local thread metadata would return empty which would cause the distribution verifier to fail. Reviewers: Divij Vaidya , Luke Chen --- .../kafka/streams/processor/internals/StreamTask.java | 2 +- .../streams/integration/RackAwarenessIntegrationTest.java | 14 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 07c4494225..f7bf8a5e74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -1267,7 +1267,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final SourceNode source = topology.source(partition.topic()); if (source == null) { throw new TopologyException( -"Topic is unknown to the topology. " + +"Topic " + partition.topic() + " is unknown to the topology. " + "This may happen if different KafkaStreams instances of the same application execute different Topologies. " + "Note that Topologies are only identical if all operators are added in the same order." ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java index 677633e9b0..7c93b769f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java @@ -55,8 +55,8 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") @@ -143,13 +143,13 @@ public class RackAwarenessIntegrationTest { createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); -assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); +waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); stopKafkaStreamsInstanceWithIndex(0); waitUntilAllKafkaStreamsClientsAreRunning(); -assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); +waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); } @Test @@ -165,7 +165,7 @@ public class RackAwarenessIntegrationTest { createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); +waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); } @Test @@ -186,7 +186,7 @@ public class RackAwarenessIntegrationTest { createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); -assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER))); +waitForCondition(() -> isIdealTaskDistributionReachedForTa
[kafka] branch trunk updated: Minor: enable index for emit final sliding window (#12461)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f7ac5d3d00 Minor: enable index for emit final sliding window (#12461) f7ac5d3d00 is described below commit f7ac5d3d00f3cd3caa25c3003900bdb245d5252e Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Fri Jul 29 14:47:25 2022 -0700 Minor: enable index for emit final sliding window (#12461) Enable index for sliding window emit final case as it's faster to fetch windows for particular key Reviewers: Guozhang Wang --- .../kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index 5ca6b911b7..587d2d5a87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -233,7 +233,7 @@ public class SlidingWindowedKStreamImpl extends AbstractStream imple Duration.ofMillis(retentionPeriod), Duration.ofMillis(windows.timeDifferenceMs()), false, -false +true ) : Stores.persistentTimestampedWindowStore( materialized.storeName(),
[kafka] branch trunk updated: MINOR: Fix static mock usage in ThreadMetricsTest (#12454)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a5d71e1550 MINOR: Fix static mock usage in ThreadMetricsTest (#12454) a5d71e1550 is described below commit a5d71e1550c5a2b670347c0c3bafb0b195bf916c Author: Bruno Cadonna AuthorDate: Thu Jul 28 22:32:46 2022 +0200 MINOR: Fix static mock usage in ThreadMetricsTest (#12454) Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito. Reviewers: Guozhang Wang --- .../internals/metrics/ThreadMetricsTest.java | 422 +++-- 1 file changed, 224 insertions(+), 198 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 6ed97ebf7c..3d2aaa20c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -24,12 +24,14 @@ import org.junit.Test; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Collections; import java.util.Map; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; @@ -54,17 +56,20 @@ public class ThreadMetricsTest { final String ratioDescription = "The fraction of time the thread spent on processing active tasks"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); -StreamsMetricsImpl.addValueMetricToSensor( -expectedSensor, -THREAD_LEVEL_GROUP, -tagMap, -operation, -ratioDescription -); - -final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics); -assertThat(sensor, is(expectedSensor)); +try (final MockedStatic streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { +final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics); +streamsMetricsStaticMock.verify( +() -> StreamsMetricsImpl.addValueMetricToSensor( +expectedSensor, +THREAD_LEVEL_GROUP, +tagMap, +operation, +ratioDescription +) +); +assertThat(sensor, is(expectedSensor)); +} } @Test @@ -74,18 +79,21 @@ public class ThreadMetricsTest { final String maxDescription = "The maximum number of records processed within an iteration"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); -StreamsMetricsImpl.addAvgAndMaxToSensor( -expectedSensor, -THREAD_LEVEL_GROUP, -tagMap, -operation, -avgDescription, -maxDescription -); -final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics); - -assertThat(sensor, is(expectedSensor)); +try (final MockedStatic streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { +final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics); +streamsMetricsStaticMock.verify( +() -> StreamsMetricsImpl.addAvgAndMaxToSensor( +expectedSensor, +THREAD_LEVEL_GROUP, +tagMap, +operation, +avgDescription, +maxDescription +) +); +assertThat(sensor, is(expectedSensor)); +} } @Test @@ -95,18 +103,21 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum process latency"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevel
[kafka] branch trunk updated (9e74f91e56 -> 2724cc9920)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 9e74f91e56 KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429) add 2724cc9920 KAFKA-10199: Bookkeep tasks during assignment for use with state updater (#12442) No new revisions were added by this update. Summary of changes: .../streams/processor/internals/StreamThread.java | 17 ++- .../streams/processor/internals/TaskManager.java | 155 ++--- .../kafka/streams/processor/internals/Tasks.java | 21 +++ .../processor/internals/StreamThreadTest.java | 4 +- .../processor/internals/TaskManagerTest.java | 92 .../org/apache/kafka/test/StreamsTestUtils.java| 23 --- 6 files changed, 212 insertions(+), 100 deletions(-)
[kafka] branch trunk updated: KAFKA-10199: Further refactor task lifecycle management (#12439)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 06f47c3b51 KAFKA-10199: Further refactor task lifecycle management (#12439) 06f47c3b51 is described below commit 06f47c3b517f5c96c1bd981369843f3483947197 Author: Guozhang Wang AuthorDate: Wed Jul 27 17:29:05 2022 -0700 KAFKA-10199: Further refactor task lifecycle management (#12439) 1. Consolidate the task recycle procedure into a single function within the task. The current procedure now becomes: a) task.recycleStateAndConvert, at end of it the task is in closed while its stateManager is retained, and the manager type has been converted; 2) create the new task with old task's fields and the stateManager inside the creators. 2. Move the task execution related metadata into the corresponding TaskExecutionMetadata class, including the task idle related metadata (e.g. successfully processed tasks); reduce the number of params needed for TaskExecutor as well as Tasks. 3. Move the task execution related fields (embedded producer and consumer) and task creators out of Tasks and migrated into TaskManager. Now the Tasks is only a bookkeeping place without any task mutation logic. 4. When adding tests, I realized that we should not add task to state updater right after creation, since it was not initialized yet, while state updater would validate that the task's state is already restoring / running. So I updated that logic while adding unit tests. Reviewers: Bruno Cadonna --- .../streams/processor/internals/AbstractTask.java | 5 +- .../processor/internals/ActiveTaskCreator.java | 36 ++- .../streams/processor/internals/StandbyTask.java | 51 +--- .../processor/internals/StandbyTaskCreator.java| 32 ++- .../streams/processor/internals/StreamTask.java| 52 +--- .../kafka/streams/processor/internals/Task.java| 5 +- .../processor/internals/TaskExecutionMetadata.java | 36 ++- .../streams/processor/internals/TaskExecutor.java | 49 ++-- .../streams/processor/internals/TaskManager.java | 188 - .../kafka/streams/processor/internals/Tasks.java | 203 -- .../processor/internals/TopologyMetadata.java | 4 +- .../processor/internals/StandbyTaskTest.java | 6 +- .../processor/internals/StreamTaskTest.java| 12 +- .../processor/internals/StreamThreadTest.java | 1 + .../internals/TaskExecutionMetadataTest.java | 11 +- .../processor/internals/TaskExecutorTest.java | 5 +- .../processor/internals/TaskManagerTest.java | 293 + .../streams/processor/internals/TasksTest.java | 76 +- 18 files changed, 512 insertions(+), 553 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 8ffbf9cd2e..a88d89fc33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -43,10 +43,11 @@ public abstract class AbstractTask implements Task { private Task.State state = CREATED; private long deadlineMs = NO_DEADLINE; -protected Set inputPartitions; protected final Logger log; -protected final LogContext logContext; protected final String logPrefix; +protected final LogContext logContext; + +protected Set inputPartitions; /** * If the checkpoint has not been loaded from the file yet (null), then we should not overwrite the checkpoint; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index ef21a51c99..46455111db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -135,10 +135,8 @@ class ActiveTaskCreator { return threadProducer; } -// TODO: change return type to `StreamTask` public Collection createTasks(final Consumer consumer, - final Map> tasksToBeCreated) { -// TODO: change type to `StreamTask` +final Map> tasksToBeCreated) { final List createdTasks = new ArrayList<>(); for (final Map.Entry> newTaskAndPartitions : tasksToBeCreated.entrySet()) { @@ -211,13 +209,39 @@ class ActiveTaskCreator { ); } +/* + * TODO: we pass in the new input partitions to validate if they still match, + * in the future we whe
[kafka] branch trunk updated (5e4ae06d12 -> 5a52601691)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428) add 5a52601691 KAFKA-10199: Add tasks to state updater when they are created (#12427) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/StreamsConfig.java| 3 + .../streams/processor/internals/AbstractTask.java | 2 +- .../processor/internals/ProcessorStateManager.java | 3 +- .../streams/processor/internals/StreamThread.java | 4 +- .../kafka/streams/processor/internals/Task.java| 2 +- .../streams/processor/internals/TaskManager.java | 18 +- .../kafka/streams/processor/internals/Tasks.java | 25 ++- .../internals/DefaultStateUpdaterTest.java | 223 ++--- .../processor/internals/StreamThreadTest.java | 6 +- .../processor/internals/TaskManagerTest.java | 27 +-- .../streams/processor/internals/TasksTest.java | 133 .../org/apache/kafka/test/StreamsTestUtils.java| 74 +++ 12 files changed, 381 insertions(+), 139 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
[kafka] branch trunk updated: MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428) 5e4ae06d12 is described below commit 5e4ae06d129f01d94ed27f73ea311a2adb0477e7 Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Thu Jul 21 12:12:29 2022 -0700 MINOR: fix flaky test test_standby_tasks_rebalance (#12428) * Description In this test, when third proc join, sometimes there are other rebalance scenarios such as followup joingroup request happens before syncgroup response was received by one of the proc and the previously assigned tasks for that proc is then lost during new joingroup request. This can result in standby tasks assigned as 3, 1, 2. This PR relax the expected assignment of 2, 2, 2 to a range of [1-3]. * Some backgroud from Guozhang: I talked to @hao Li offline and also inspected the code a bit, and tl;dr is that I think the code logic is correct (i.e. we do not really have a bug), but we need to relax the test verification a little bit. The general idea behind the subscription info is that: When a client joins the group, its subscription will try to encode all its current assigned active and standby tasks, which would be used as prev active and standby tasks by the assignor in order to achieve some stickiness. When a client drops all its active/standby tasks due to errors, it does not actually report all empty from its subscription, instead it tries to check its local state directory (you can see that from TaskManager#getTaskOffsetSums which populates the taskOffsetSum. For active task, its offset would be “-2” a.k.a. LATEST_OFFSET, for standby task, its offset is an actual numerical number. So in this case, the proc2 which drops all its active and standby tasks, would still report all tasks that have some local state still, and since it was previously owning all six tasks (three as active, and three as standby), it would report all six as standbys, and when that happens the resulted assignment as @hao Li verified, is indeed the un-even one. So I think the actual “issue“ happens here, is when proc2 is a bit late sending the sync-group request, when the previous rebalance has already completed, and a follow-up rebalance has already triggered, in that case, the resulted un-even assignment is indeed expected. Such a scenario, though not common, is still legitimate since in practice all kinds of timing skewness across instances can happen. So I think we should just relax our verification here, i.e. just making sure that each [...] Reviewers: Suhas Satish , Guozhang Wang --- .../tests/streams/streams_standby_replica_test.py | 19 +-- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py index a8c07513c1..c0e5953f73 100644 --- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py +++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py @@ -73,9 +73,9 @@ class StreamsStandbyTask(BaseStreamsTest): processor_3.start() -self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE) -self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE) -self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE) +self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) +self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE) +self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE) processor_1.stop() @@ -93,9 +93,9 @@ class StreamsStandbyTask(BaseStreamsTest): processor_2.start() -self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_1.STDOUT_FILE) -self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE) -self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2) +self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) +self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE) +self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", proce
[kafka] branch trunk updated: KAFKA-10199: Add RESUME in state updater (#12387)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387) 693e283802 is described below commit 693e283802590b724ef441d5bf7acb6eeced91c5 Author: Guozhang Wang AuthorDate: Tue Jul 19 09:44:10 2022 -0700 KAFKA-10199: Add RESUME in state updater (#12387) * Need to check enforceRestoreActive / transitToUpdateStandby when resuming a paused task. * Do not expose another getResumedTasks since I think its caller only need the getPausedTasks. Reviewers: Bruno Cadonna --- .../processor/internals/DefaultStateUpdater.java | 36 - .../streams/processor/internals/StateUpdater.java | 13 ++ .../streams/processor/internals/TaskAndAction.java | 10 +- .../internals/DefaultStateUpdaterTest.java | 158 - .../processor/internals/TaskAndActionTest.java | 20 +++ 5 files changed, 229 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 08959bee00..7e7ec2a6f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater { case PAUSE: pauseTask(taskAndAction.getTaskId()); break; +case RESUME: +resumeTask(taskAndAction.getTaskId()); +break; } } } finally { @@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater { final Task existingTask = updatingTasks.putIfAbsent(task.id(), task); if (existingTask != null) { throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " + -"should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE); +"should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE); } if (task.isActive()) { @@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater { } } +private void resumeTask(final TaskId taskId) { +final Task task = pausedTasks.get(taskId); +if (task != null) { +updatingTasks.put(taskId, task); +pausedTasks.remove(taskId); + +if (task.isActive()) { +log.debug("Stateful active task " + task.id() + " was resumed to the updating tasks of the state updater"); +changelogReader.enforceRestoreActive(); +} else { +log.debug("Standby task " + task.id() + " was resumed to the updating tasks of the state updater"); +if (updatingTasks.size() == 1) { +changelogReader.transitToUpdateStandby(); +} +} +} else { +log.debug("Task " + taskId + " was not resumed since it is not paused."); +} +} + private boolean isStateless(final Task task) { return task.changelogPartitions().isEmpty() && task.isActive(); } @@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater { } } +@Override +public void resume(final TaskId taskId) { +tasksAndActionsLock.lock(); +try { +tasksAndActions.add(TaskAndAction.createResumeTask(taskId)); +tasksAndActionsCondition.signalAll(); +} finally { +tasksAndActionsLock.unlock(); +} +} + @Override public Set drainRestoredActiveTasks(final Duration timeout) { final long timeoutMs = timeout.toMillis(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index 516e47436b..69d521b600 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/
[kafka] branch trunk updated: KAFKA-10199: Add PAUSE in state updater (#12386)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386) 309e0f986e is described below commit 309e0f986e97be966c797f7729eb1e94ef5400a9 Author: Guozhang Wang AuthorDate: Mon Jul 18 16:42:48 2022 -0700 KAFKA-10199: Add PAUSE in state updater (#12386) * Add pause action to task-updater. * When removing a task, also check in the paused tasks in addition to removed tasks. * Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well. Reviewers: Bruno Cadonna --- .../processor/internals/DefaultStateUpdater.java | 62 - .../streams/processor/internals/StateUpdater.java | 13 + .../processor/internals/StoreChangelogReader.java | 2 +- .../streams/processor/internals/TaskAndAction.java | 10 +- .../internals/DefaultStateUpdaterTest.java | 282 - .../processor/internals/TaskAndActionTest.java | 20 ++ 6 files changed, 379 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 22fd48a4ab..08959bee00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -83,7 +83,7 @@ public class DefaultStateUpdater implements StateUpdater { } public boolean onlyStandbyTasksLeft() { -return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive()); +return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive); } @Override @@ -125,6 +125,9 @@ public class DefaultStateUpdater implements StateUpdater { case REMOVE: removeTask(taskAndAction.getTaskId()); break; +case PAUSE: +pauseTask(taskAndAction.getTaskId()); +break; } } } finally { @@ -243,7 +246,12 @@ public class DefaultStateUpdater implements StateUpdater { addToRestoredTasks((StreamTask) task); log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); } else { -updatingTasks.put(task.id(), task); +final Task existingTask = updatingTasks.putIfAbsent(task.id(), task); +if (existingTask != null) { +throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " + +"should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE); +} + if (task.isActive()) { log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater"); changelogReader.enforceRestoreActive(); @@ -257,8 +265,9 @@ public class DefaultStateUpdater implements StateUpdater { } private void removeTask(final TaskId taskId) { -final Task task = updatingTasks.get(taskId); -if (task != null) { +final Task task; +if (updatingTasks.containsKey(taskId)) { +task = updatingTasks.get(taskId); task.maybeCheckpoint(true); final Collection changelogPartitions = task.changelogPartitions(); changelogReader.unregister(changelogPartitions); @@ -267,8 +276,31 @@ public class DefaultStateUpdater implements StateUpdater { transitToUpdateStandbysIfOnlyStandbysLeft(); log.debug((task.isActive() ? "Active" : "Standby") + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks."); +} else if (pausedTasks.containsKey(taskId)) { +task = pausedTasks.get(taskId); +final Collection changelogPartitions = task.changelogPartitions(); +changelogReader.unregister(changelogPartitions); +removedTasks.add(task); +pausedTasks.remove(taskId); +log.debug((task.isActive() ? "Act
[kafka] branch trunk updated: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b5d4fa7645e KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370) b5d4fa7645e is described below commit b5d4fa7645eb75d2030eb8cac78545a681686a39 Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Tue Jul 12 10:57:11 2022 -0700 KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370) 1. Added more unit test for RocksDBTimeOrderedSessionStore and RocksDBTimeOrderedSessionSegmentedBytesStore 2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE Reviewers: Matthias J. Sax , Guozhang Wang --- .../internals/SessionWindowedKStreamImpl.java | 3 + .../internals/SlidingWindowedKStreamImpl.java | 4 +- .../state/internals/PrefixedSessionKeySchemas.java | 14 +-- ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 8 +- .../internals/RocksDBTimeOrderedSessionStore.java | 8 +- ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 138 + .../internals/AbstractSessionBytesStoreTest.java | 124 ++ .../state/internals/InMemorySessionStoreTest.java | 41 +- .../state/internals/RocksDBSessionStoreTest.java | 57 + 9 files changed, 291 insertions(+), 106 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index c3b05cb1182..8c60019fccb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -289,7 +289,10 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple // do not enable cache if the emit final strategy is used if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); +} else { +builder.withCachingDisabled(); } + return builder; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index 70c75b4c82e..5ca6b911b7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -258,7 +258,9 @@ public class SlidingWindowedKStreamImpl extends AbstractStream imple } else { builder.withLoggingDisabled(); } -if (materialized.cachingEnabled()) { + +// do not enable cache if the emit final strategy is used +if (materialized.cachingEnabled() && emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); } else { builder.withCachingDisabled(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java index 2ac25277ba8..3ce00bcb8a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java @@ -102,8 +102,8 @@ public class PrefixedSessionKeySchemas { @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, - final long from, - final long to, + final long earliestWindowEndTime, + final long latestWindowStartTime, final boolean forward) { return iterator -> { while (iterator.hasNext()) { @@ -120,13 +120,13 @@ public class PrefixedSessionKeySchemas { // We can return false directly here since keys are sorted by end time and if // we get time smaller than `from`, there won't be time within range. -if (!forward && endTime < from) { +if (!forward && endTime < earliestWindowE
[kafka] branch trunk updated (38b08dfd338 -> 5a1bac2608c)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 38b08dfd338 MINOR: revert KIP-770 (#12383) add 5a1bac2608c KAFKA-13846: Follow up PR to address review comments (#12297) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/common/metrics/Metrics.java | 12 +++- docs/upgrade.html | 15 --- 2 files changed, 19 insertions(+), 8 deletions(-)
[kafka] branch trunk updated: HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ca8135b242a HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions ca8135b242a is described below commit ca8135b242a5d5fd71e5edffbcb85ada2f5d9bd2 Author: Guozhang Wang AuthorDate: Wed Jul 6 22:00:31 2022 -0700 HOTFIX: KIP-851, rename requireStable in ListConsumerGroupOffsetsOptions --- .../src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 2 +- .../org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 99fd98b443f..aad2610c94a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3404,7 +3404,7 @@ public class KafkaAdminClient extends AdminClient { final ListConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture> future = ListConsumerGroupOffsetsHandler.newFuture(groupId); -ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.shouldRequireStable(), logContext); +ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext); invokeDriver(handler, future, options.timeoutMs); return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java index a06feaf3b38..292a47ef393 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java @@ -60,7 +60,7 @@ public class ListConsumerGroupOffsetsOptions extends AbstractOptions
[kafka] branch trunk updated (6495a0768ca -> 915c7812439)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset (#12360) add 915c7812439 KAFKA-10199: Remove main consumer from store changelog reader (#12337) No new revisions were added by this update. Summary of changes: .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../admin/ListConsumerGroupOffsetsOptions.java | 13 .../internals/ListConsumerGroupOffsetsHandler.java | 15 - .../kafka/clients/admin/AdminClientTestUtils.java | 7 +++ .../kafka/clients/admin/KafkaAdminClientTest.java | 33 ++ .../kafka/clients/admin/MockAdminClient.java | 38 ++-- .../processor/internals/DefaultStateUpdater.java | 3 +- .../processor/internals/StoreChangelogReader.java | 71 -- .../streams/processor/internals/StreamThread.java | 1 - .../internals/DefaultStateUpdaterTest.java | 1 + .../processor/internals/StandbyTaskTest.java | 8 ++- .../internals/StoreChangelogReaderTest.java| 50 ++- .../processor/internals/StreamTaskTest.java| 27 +++- 13 files changed, 188 insertions(+), 81 deletions(-)
[kafka] branch trunk updated: HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ae570f5953 HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363) ae570f5953 is described below commit ae570f59533ae941bbf5ab9ff5e739a5bd855fd6 Author: Guozhang Wang AuthorDate: Sun Jul 3 10:02:59 2022 -0700 HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363) 1. As titled, fix the right constructor param ordering. 2. Also added a few more loglines. Reviewers: Matthias J. Sax , Sagar Rao , Hao Li <1127478+lihao...@users.noreply.github.com> --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 7 ++- .../kafka/streams/processor/internals/PartitionGroup.java | 11 ++- .../apache/kafka/streams/processor/internals/StreamTask.java | 4 +--- .../kafka/streams/processor/internals/StreamThread.java | 11 --- .../kafka/streams/processor/internals/PartitionGroupTest.java | 2 +- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 86ab83f67d..2c95aa85a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -978,6 +978,10 @@ public class KafkaStreams implements AutoCloseable { } // Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here. resizeThreadCacheAndBufferMemory(numStreamThreads); +if (numStreamThreads > 0) { +log.info("Initializing {} StreamThread with cache size/max buffer size values as {} per thread.", +numStreamThreads, getThreadCacheAndBufferMemoryString()); +} stateDirCleaner = setupStateDirCleaner(); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); @@ -1143,7 +1147,8 @@ public class KafkaStreams implements AutoCloseable { + "for it to complete shutdown as this will result in deadlock.", streamThread.getName()); } resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads()); -log.info("Resizing thread cache/max buffer size due to removal of thread {}, new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString()); +log.info("Resizing thread cache/max buffer size due to removal of thread {}, " + +"new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString()); if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get()); final Collection membersToRemove = Collections.singletonList(memberToRemove); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 21d3cbfa3f..750699a1ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -64,7 +64,7 @@ public class PartitionGroup { private final Sensor enforcedProcessingSensor; private final long maxTaskIdleMs; private final Sensor recordLatenessSensor; -private final Sensor totalBytesSensor; +private final Sensor totalInputBufferBytesSensor; private final PriorityQueue nonEmptyQueuesByTime; private long streamTime; @@ -93,8 +93,8 @@ public class PartitionGroup { final Map partitionQueues, final Function lagProvider, final Sensor recordLatenessSensor, + final Sensor totalInputBufferBytesSensor, final Sensor enforcedProcessingSensor, - final Sensor totalBytesSensor, final long maxTaskIdleMs) { this.logger = logContext.logger(PartitionGroup.class); nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); @@ -103,7 +103,7 @@ public class PartitionGroup { this.enforcedProcessingSensor = enforcedProcessingSensor; this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor
[kafka-site] branch asf-site updated: Fix docs in quickstart.html (#419)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 60d9b3aa Fix docs in quickstart.html (#419) 60d9b3aa is described below commit 60d9b3aaa444aedf6646e484f587f8a170781d33 Author: JK-Wang <32212764+jk-w...@users.noreply.github.com> AuthorDate: Mon Jul 4 00:12:01 2022 +0800 Fix docs in quickstart.html (#419) Reviewers: Guozhang Wang --- 32/quickstart.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/32/quickstart.html b/32/quickstart.html index 79a33f8f..0d9fca9b 100644 --- a/32/quickstart.html +++ b/32/quickstart.html @@ -183,7 +183,7 @@ This is my second event - echo "plugin.path=lib/connect-file-{{fullDotVersion}}.jar" + echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar" Then, start by creating some seed data to test with:
[kafka] branch trunk updated (bad475166f -> a82a8e02ce)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from bad475166f MINOR: Add indent space after hyperlink in `docs/upgrade.html` (#12353) add a82a8e02ce MINOR: Fix static mock usage in TaskMetricsTest (#12373) No new revisions were added by this update. Summary of changes: .../internals/metrics/TaskMetricsTest.java | 266 - 1 file changed, 149 insertions(+), 117 deletions(-)
[kafka] branch trunk updated: MINOR: Use mock time in DefaultStateUpdaterTest (#12344)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3faa6cf6d0 MINOR: Use mock time in DefaultStateUpdaterTest (#12344) 3faa6cf6d0 is described below commit 3faa6cf6d060887288fcf68adb8c3f1e2090b8ed Author: Guozhang Wang AuthorDate: Wed Jun 29 12:33:00 2022 -0700 MINOR: Use mock time in DefaultStateUpdaterTest (#12344) For most tests we would need an auto-ticking mock timer to work with draining-with-timeout functions. For tests that check for never checkpoint we need no auto-ticking timer to control exactly how much time elapsed. Reviewers: Bruno Cadonna --- .../processor/internals/DefaultStateUpdater.java | 5 ++-- .../internals/DefaultStateUpdaterTest.java | 34 -- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 0e84574c5c..886a37b314 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -356,8 +356,6 @@ public class DefaultStateUpdater implements StateUpdater { this.offsetResetter = offsetResetter; this.time = time; this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); -// initialize the last commit as of now to prevent first commit happens immediately -this.lastCommitMs = time.milliseconds(); } public void start() { @@ -365,6 +363,9 @@ public class DefaultStateUpdater implements StateUpdater { stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader, offsetResetter); stateUpdaterThread.start(); shutdownGate = new CountDownLatch(1); + +// initialize the last commit as of now to prevent first commit happens immediately +this.lastCommitMs = time.milliseconds(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 8bd81828f6..5e2d90de71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; @@ -44,7 +45,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.common.utils.Utils.sleep; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.easymock.EasyMock.anyBoolean; @@ -79,24 +79,25 @@ class DefaultStateUpdaterTest { private final static TaskId TASK_1_0 = new TaskId(1, 0); private final static TaskId TASK_1_1 = new TaskId(1, 1); -private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); +// need an auto-tick timer to work for draining with timeout +private final Time time = new MockTime(1L); +private final StreamsConfig config = new StreamsConfig(configProps()); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final java.util.function.Consumer> offsetResetter = topicPartitions -> { }; - -private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); +private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time); @AfterEach public void tearDown() { stateUpdater.shutdown(Duration.ofMinutes(1)); } -private Properties configProps(final int commitInterval) { +private Properties configProps() { return mkObjectProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2
[kafka] branch trunk updated: [9/N][Emit final] Emit final for session window aggregations (#12204)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ababc4261b [9/N][Emit final] Emit final for session window aggregations (#12204) ababc4261b is described below commit ababc4261bfa03ee9d29ae7254ddd0ba988f826d Author: Guozhang Wang AuthorDate: Wed Jun 29 09:22:37 2022 -0700 [9/N][Emit final] Emit final for session window aggregations (#12204) * Add a new API for session windows to range query session window by end time (KIP related). * Augment session window aggregator with emit strategy. * Minor: consolidated some dup classes. * Test: unit test on session window aggregator. Reviewers: Guozhang Wang --- .../streams/kstream/SessionWindowedKStream.java| 5 +- .../kafka/streams/kstream/TimeWindowedKStream.java | 1 + ...bstractKStreamTimeWindowAggregateProcessor.java | 11 +- .../internals/CogroupedStreamAggregateBuilder.java | 2 + .../internals/KStreamSessionWindowAggregate.java | 272 - .../kstream/internals/KStreamWindowAggregate.java | 7 - .../kstream/internals/SessionTupleForwarder.java | 56 - .../internals/SessionWindowedKStreamImpl.java | 29 ++- .../kstream/internals/TimeWindowedKStreamImpl.java | 55 +++-- .../internals/TimestampedTupleForwarder.java | 3 +- .../internals/AbstractReadWriteDecorator.java | 6 + .../apache/kafka/streams/state/SessionStore.java | 13 + ...tractRocksDBTimeOrderedSegmentedBytesStore.java | 6 +- .../internals/ChangeLoggingSessionBytesStore.java | 12 +- .../state/internals/InMemorySessionStore.java | 21 +- .../state/internals/MeteredSessionStore.java | 12 + .../state/internals/PrefixedSessionKeySchemas.java | 13 +- ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 33 ++- .../internals/RocksDBTimeOrderedSessionStore.java | 7 + .../streams/state/internals/SegmentIterator.java | 2 +- .../state/internals/SegmentedBytesStore.java | 4 +- .../streams/state/internals/SessionKeySchema.java | 2 +- ...KStreamSessionWindowAggregateProcessorTest.java | 219 - .../internals/KStreamWindowAggregateTest.java | 2 +- .../internals/SessionTupleForwarderTest.java | 108 .../internals/SessionWindowedKStreamImplTest.java | 171 + .../internals/TimeWindowedKStreamImplTest.java | 2 +- .../internals/graph/GraphGraceSearchUtilTest.java | 8 +- 28 files changed, 676 insertions(+), 406 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java index c561b62abf..fe897515a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java @@ -39,7 +39,7 @@ import java.time.Duration; * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. - * New events are added to sessions until their grace period ends (see {@link SessionWindows#grace(Duration)}). + * New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}). * * A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via * {@link KGroupedStream#windowedBy(SessionWindows)}. @@ -643,4 +643,7 @@ public interface SessionWindowedKStream { KTable, V> reduce(final Reducer reducer, final Named named, final Materialized> materialized); + +// TODO: add javadoc +SessionWindowedKStream emitStrategy(final EmitStrategy emitStrategy); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java index 46ebd267f9..3f36838f20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java @@ -649,5 +649,6 @@ public interface TimeWindowedKStream { final Named named, final Materialized> materialized); +// TODO: add javadoc TimeWindowedKStream emitStrategy(final EmitStrategy emitStrategy); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregat
[kafka-site] branch asf-site updated: Add atguigu(http://www.atguigu.com/)to the list pf the "Powered By ❤" (#418)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 375a2ec4 Add atguigu(http://www.atguigu.com/)to the list pf the "Powered By ❤" (#418) 375a2ec4 is described below commit 375a2ec493b1b1e0f5fbf3e275c90f0b09bc982b Author: realdengziqi <42276568+realdengz...@users.noreply.github.com> AuthorDate: Tue Jun 28 11:28:03 2022 +0800 Add atguigu(http://www.atguigu.com/)to the list pf the "Powered By ❤" (#418) Reviewers: Guozhang Wang --- images/powered-by/atguigu.png | Bin 0 -> 39711 bytes powered-by.html | 5 + 2 files changed, 5 insertions(+) diff --git a/images/powered-by/atguigu.png b/images/powered-by/atguigu.png new file mode 100644 index ..4e72d8b5 Binary files /dev/null and b/images/powered-by/atguigu.png differ diff --git a/powered-by.html b/powered-by.html index ce172902..c5922bca 100644 --- a/powered-by.html +++ b/powered-by.html @@ -694,6 +694,11 @@ "logo": "atruvia_logo_online_rgb.png", "logoBgColor": "#d4f2f5", "description": "At Atruvia we use Apache Kafka to share events within the modern banking platform." +}, { +"link": "http://www.atguigu.com/;, +"logo": "atguigu.png", +"logoBgColor": "#ff", +"description": "In our real-time data warehouse, apache kafka is used as a reliable distributed message queue, which allows us to build a highly available analysis system." }];
[kafka] branch trunk updated: KAFKA-10199: Expose tasks in state updater (#12312)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1ceaf30039 KAFKA-10199: Expose tasks in state updater (#12312) 1ceaf30039 is described below commit 1ceaf30039e48199e40951c5a8d52894bb45e4d3 Author: Bruno Cadonna AuthorDate: Fri Jun 24 18:33:24 2022 +0200 KAFKA-10199: Expose tasks in state updater (#12312) This PR exposes the tasks managed by the state updater. The state updater manages all tasks that were added to the state updater and that have not yet been removed from it by draining one of the output queues. Reviewers: Guozhang Wang --- .../processor/internals/DefaultStateUpdater.java | 149 ++--- .../streams/processor/internals/StateUpdater.java | 61 +++- .../internals/DefaultStateUpdaterTest.java | 332 + 3 files changed, 436 insertions(+), 106 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index cc580a3b38..0e84574c5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -47,7 +47,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; public class DefaultStateUpdater implements StateUpdater { @@ -86,7 +88,7 @@ public class DefaultStateUpdater implements StateUpdater { } public boolean onlyStandbyTasksLeft() { -return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive); +return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive()); } @Override @@ -152,9 +154,7 @@ public class DefaultStateUpdater implements StateUpdater { private void handleRuntimeException(final RuntimeException runtimeException) { log.error("An unexpected error occurred within the state updater thread: " + runtimeException); -final ExceptionAndTasks exceptionAndTasks = new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException); -updatingTasks.clear(); -exceptionsAndFailedTasks.add(exceptionAndTasks); +addToExceptionsAndFailedTasksThenClearUpdatingTasks(new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException)); isRunning.set(false); } @@ -163,41 +163,51 @@ public class DefaultStateUpdater implements StateUpdater { final Set corruptedTaskIds = taskCorruptedException.corruptedTasks(); final Set corruptedTasks = new HashSet<>(); for (final TaskId taskId : corruptedTaskIds) { -final Task corruptedTask = updatingTasks.remove(taskId); +final Task corruptedTask = updatingTasks.get(taskId); if (corruptedTask == null) { throw new IllegalStateException("Task " + taskId + " is corrupted but is not updating. " + BUG_ERROR_MESSAGE); } corruptedTasks.add(corruptedTask); } -exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); +addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); } private void handleStreamsException(final StreamsException streamsException) { log.info("Encountered streams exception: ", streamsException); -final ExceptionAndTasks exceptionAndTasks; if (streamsException.taskId().isPresent()) { -exceptionAndTasks = handleStreamsExceptionWithTask(streamsException); +handleStreamsExceptionWithTask(streamsException); } else { -exceptionAndTasks = handleStreamsExceptionWithoutTask(streamsException); +handleStreamsExceptionWithoutTask(streamsException); } -exceptionsAndFailedTasks.add(exceptionAndTasks); } -private ExceptionAndTasks handleStreamsExceptionWithTask(final StreamsException streamsException) { +private void handleStreamsExceptionWithTask(final StreamsException streamsException) { final TaskId failedTaskId = streamsException.taskId().get(); if (!updatingTasks.contai
[kafka] branch trunk updated: KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 925c628173 KAFKA-10199: Commit the restoration progress within StateUpdater (#12279) 925c628173 is described below commit 925c6281733662cd40fffaab54a6483b00f80ee6 Author: Guozhang Wang AuthorDate: Thu Jun 23 10:46:14 2022 -0700 KAFKA-10199: Commit the restoration progress within StateUpdater (#12279) During restoring, we should always commit a.k.a. write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened. Also when we complete restore or remove task, we should enforce a checkpoint as well; for failing cases though, we should not write a new one. Reviewers: Bruno Cadonna --- .../streams/processor/internals/AbstractTask.java | 4 +- .../processor/internals/DefaultStateUpdater.java | 36 +++- .../streams/processor/internals/StandbyTask.java | 2 +- .../streams/processor/internals/StreamTask.java| 8 +- .../kafka/streams/processor/internals/Task.java| 6 ++ .../internals/DefaultStateUpdaterTest.java | 101 +++-- .../processor/internals/StandbyTaskTest.java | 41 + .../processor/internals/StreamTaskTest.java| 16 8 files changed, 197 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 4e652a6dfc..c64fadfe5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task { * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed * or flushing state store get IO errors; such error should cause the thread to die */ -protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { +@Override +public void maybeCheckpoint(final boolean enforceCheckpoint) { final Map offsetSnapshot = stateMgr.changelogOffsets(); if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) { // the state's current offset would be used to checkpoint @@ -98,7 +99,6 @@ public abstract class AbstractTask implements Task { } } - @Override public TaskId id() { return id; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 54cb7bc427..cc580a3b38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; @@ -85,7 +86,7 @@ public class DefaultStateUpdater implements StateUpdater { } public boolean onlyStandbyTasksLeft() { -return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive()); +return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive); } @Override @@ -111,6 +112,7 @@ public class DefaultStateUpdater implements StateUpdater { private void runOnce() throws InterruptedException { performActionsOnTasks(); restoreTasks(); +maybeCheckpointUpdatingTasks(time.milliseconds()); waitIfAllChangelogsCompletelyRead(); } @@ -252,6 +254,8 @@ public class DefaultStateUpdater implements StateUpdater { private void removeTask(final TaskId taskId) { final Task task = updatingTasks.remove(taskId); if (task != null) { +task.maybeCheckpoint(true); + final Collection changelogPartitions = task.changelogPartitions(); changelogReader.unregister(changelogPartitions); removedTasks.add(task); @@ -271,9 +275,10 @@ public class DefaultStateUpdater implements StateUpdater {
[kafka] branch trunk updated: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cfdd567955 KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304) cfdd567955 is described below commit cfdd567955588e134770a9145ba57800ca88313c Author: Guozhang Wang AuthorDate: Fri Jun 17 20:17:02 2022 -0700 KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304) There are some considerata embedded in this seemingly straight-forward PR that I'd like to explain here. The StreamPartitioner is used to send records to three types of topics: 1) repartition topics, where key should never be null. 2) changelog topics, where key should never be null. 3) sink topics, where only non-windowed key could be null and windowed key should still never be null. Also, the StreamPartitioner is used as part of the IQ to determine which host contains a certain key, as determined by the case 2) above. This PR's main goal is to remove the deprecated producer's default partitioner, while with those things in mind such that: We want to make sure for not-null keys, the default murmur2 hash behavior of the streams' partitioner stays consistent with producer's new built-in partitioner. For null-keys (which is only possible for non-window default stream partition, and is never used for IQ), we would fix the issue that we may never rotate to a new partitioner by setting the partition as null hence relying on the newly introduced built-in partitioner. Reviewers: Artem Livshits <84364232+artemlivsh...@users.noreply.github.com>, Matthias J. Sax --- .../kafka/clients/producer/KafkaProducer.java | 3 ++- .../producer/internals/BuiltInPartitioner.java | 7 +++ .../producer/internals/DefaultPartitioner.java | 4 +--- .../internals/WindowedStreamPartitioner.java | 10 +- .../internals/DefaultStreamPartitioner.java| 22 -- .../processor/internals/StreamsMetadataState.java | 4 ++-- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e85d9eb8a9..74d408d9a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.producer.internals.BufferPool; +import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetadata; @@ -1385,7 +1386,7 @@ public class KafkaProducer implements Producer { if (serializedKey != null && !partitionerIgnoreKeys) { // hash the keyBytes to choose a partition -return Utils.toPositive(Utils.murmur2(serializedKey)) % cluster.partitionsForTopic(record.topic()).size(); +return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()); } else { return RecordMetadata.UNKNOWN_PARTITION; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java index 1c2d10f3f6..a5805df56b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java @@ -279,6 +279,13 @@ public class BuiltInPartitioner { } } +/* + * Default hashing function to choose a partition from the serialized key bytes + */ +public static int partitionForKey(final byte[] serializedKey, final int numPartitions) { +return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; +} + /** * The partition load stats for each topic that are used for adaptive partition distribution. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java index 2c2e79fb20..716773626c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/cl
[kafka] branch trunk updated: KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) ee565f5f6b is described below commit ee565f5f6b97c84d4f7f895fcb79188822284414 Author: jnewhouse AuthorDate: Thu Jun 16 14:27:38 2022 -0700 KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen in order to log them for durability. This set is never used nor cleared if logging is not enabled. Having it be populated creates a memory leak. This change stops populating the set if logging is not enabled. Reviewers: Divij Vaidya , Kvicii <42023367+kvi...@users.noreply.github.com>, Guozhang Wang --- .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 5894023bbe..5403f9e703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere delegate.remove(); index.remove(next.getKey().key()); -dirtyKeys.add(next.getKey().key()); +if (loggingEnabled) { +dirtyKeys.add(next.getKey().key()); +} memBufferSize -= computeRecordSize(next.getKey().key(), bufferValue); @@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere serializedKey, new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext) ); -dirtyKeys.add(serializedKey); +if (loggingEnabled) { +dirtyKeys.add(serializedKey); +} updateBufferMetrics(); }
[kafka] branch trunk updated: KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 39a555ba94 KAFKA-13846: Use the new addMetricsIfAbsent API (#12287) 39a555ba94 is described below commit 39a555ba94a6a5d851b31e0a7f07e19c48327835 Author: Guozhang Wang AuthorDate: Tue Jun 14 16:04:26 2022 -0700 KAFKA-13846: Use the new addMetricsIfAbsent API (#12287) Use the newly added function to replace the old addMetric function that may throw illegal argument exceptions. Although in some cases concurrency should not be possible they do not necessarily remain always true in the future, so it's better to use the new API just to be less error-prone. Reviewers: Bruno Cadonna --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 11 +-- .../java/org/apache/kafka/connect/runtime/WorkerTask.java | 5 ++--- .../processor/internals/metrics/StreamsMetricsImpl.java | 3 +-- .../processor/internals/metrics/StreamsMetricsImplTest.java | 2 +- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index fa7073cf0d..73ffd217ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1866,12 +1866,11 @@ public class Fetcher implements Closeable { for (TopicPartition tp : newAssignedPartitions) { if (!this.assignedPartitions.contains(tp)) { MetricName metricName = partitionPreferredReadReplicaMetricName(tp); -if (metrics.metric(metricName) == null) { -metrics.addMetric( -metricName, -(Gauge) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) -); -} +metrics.addMetricIfAbsent( +metricName, +null, +(Gauge) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) +); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 072e4b34a1..f7c819fb4a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Frequencies; @@ -377,10 +378,8 @@ abstract class WorkerTask implements Runnable { private void addRatioMetric(final State matchingState, MetricNameTemplate template) { MetricName metricName = metricGroup.metricName(template); -if (metricGroup.metrics().metric(metricName) == null) { -metricGroup.metrics().addMetric(metricName, (config, now) -> +metricGroup.metrics().addMetricIfAbsent(metricName, null, (Gauge) (config, now) -> taskStateTimer.durationRatio(matchingState, now)); -} } void close() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 4bfd96265b..cb77fc9bdd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -502,9 +502,8 @@ public class StreamsMetricsImpl implements StreamsMetrics { storeLevelTagMap(taskId, metricsScope, storeName) ); if (metrics.metric(metricName) == null) { -final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); +metrics.addMetricIfAbsent(metricName, new MetricConfig().recordLevel(recordingLevel), valueProvider); final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName); -metrics.addMetric(metricName, metricConfig, v
[kafka] branch trunk updated: KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121) 5cab11cf52 is described below commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb Author: vamossagar12 AuthorDate: Mon Jun 13 23:06:39 2022 +0530 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121) Reviewers: David Jacot , Justine Olshan , Guozhang Wang --- .../org/apache/kafka/common/metrics/Metrics.java | 40 +-- .../org/apache/kafka/common/metrics/Sensor.java| 10 - .../kafka/connect/runtime/ConnectMetrics.java | 8 +--- .../internals/metrics/StreamsMetricsImplTest.java | 46 ++ 4 files changed, 92 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 52b7794a4c..398819016c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -509,7 +509,10 @@ public class Metrics implements Closeable { Objects.requireNonNull(metricValueProvider), config == null ? this.config : config, time); -registerMetric(m); +KafkaMetric existingMetric = registerMetric(m); +if (existingMetric != null) { +throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); +} } /** @@ -524,6 +527,26 @@ public class Metrics implements Closeable { addMetric(metricName, null, metricValueProvider); } +/** + * Create or get an existing metric to monitor an object that implements MetricValueProvider. + * This metric won't be associated with any sensor. This is a way to expose existing values as metrics. + * This method takes care of synchronisation while updating/accessing metrics by concurrent threads. + * + * @param metricName The name of the metric + * @param metricValueProvider The metric value provider associated with this metric + * @return Existing KafkaMetric if already registered or else a newly created one + */ +public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider metricValueProvider) { +KafkaMetric metric = new KafkaMetric(new Object(), +Objects.requireNonNull(metricName), +Objects.requireNonNull(metricValueProvider), +config == null ? this.config : config, +time); + +KafkaMetric existingMetric = registerMetric(metric); +return existingMetric == null ? metric : existingMetric; +} + /** * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval` * will be invoked for each reporter. @@ -563,10 +586,18 @@ public class Metrics implements Closeable { } } -synchronized void registerMetric(KafkaMetric metric) { +/** + * Register a metric if not present or return an already existing metric otherwise. + * When a metric is newly registered, this method returns null + * + * @param metric The KafkaMetric to register + * @return KafkaMetric if the metric already exists, null otherwise + */ +synchronized KafkaMetric registerMetric(KafkaMetric metric) { MetricName metricName = metric.metricName(); -if (this.metrics.containsKey(metricName)) -throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); +if (this.metrics.containsKey(metricName)) { +return this.metrics.get(metricName); +} this.metrics.put(metricName, metric); for (MetricsReporter reporter : reporters) { try { @@ -576,6 +607,7 @@ public class Metrics implements Closeable { } } log.trace("Registered metric named {}", metricName); +return null; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 5ae3b8d997..25f3c21a31 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -297,7 +297,10 @@ public final class Sensor { for (NamedMeasurable m : stat.stats()) { final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statCo
[kafka] branch trunk updated: KAFKA-10199: Implement removing active and standby tasks from the state updater (#12270)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e67408c859 KAFKA-10199: Implement removing active and standby tasks from the state updater (#12270) e67408c859 is described below commit e67408c859fb2a80f1b3c208b7fef6ddc9a711fb Author: Bruno Cadonna AuthorDate: Thu Jun 9 19:28:26 2022 +0200 KAFKA-10199: Implement removing active and standby tasks from the state updater (#12270) This PR adds removing of active and standby tasks from the default implementation of the state updater. The PR also includes refactoring that clean up the code. Reviewers: Guozhang Wang --- .../processor/internals/DefaultStateUpdater.java | 129 --- .../streams/processor/internals/StateUpdater.java | 78 ++-- .../streams/processor/internals/TaskAndAction.java | 67 .../internals/DefaultStateUpdaterTest.java | 419 + .../processor/internals/TaskAndActionTest.java | 68 5 files changed, 595 insertions(+), 166 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 55935d3e21..54cb7bc427 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -23,13 +23,13 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.TaskAndAction.Action; import org.slf4j.Logger; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -57,7 +58,7 @@ public class DefaultStateUpdater implements StateUpdater { private final ChangelogReader changelogReader; private final AtomicBoolean isRunning = new AtomicBoolean(true); private final Consumer> offsetResetter; -private final Map updatingTasks = new HashMap<>(); +private final Map updatingTasks = new ConcurrentHashMap<>(); private final Logger log; public StateUpdaterThread(final String name, @@ -72,7 +73,7 @@ public class DefaultStateUpdater implements StateUpdater { log = logContext.logger(DefaultStateUpdater.class); } -public Collection getAllUpdatingTasks() { +public Collection getUpdatingTasks() { return updatingTasks.values(); } @@ -117,11 +118,13 @@ public class DefaultStateUpdater implements StateUpdater { tasksAndActionsLock.lock(); try { for (final TaskAndAction taskAndAction : getTasksAndActions()) { -final Task task = taskAndAction.task; -final Action action = taskAndAction.action; +final Action action = taskAndAction.getAction(); switch (action) { case ADD: -addTask(task); +addTask(taskAndAction.getTask()); +break; +case REMOVE: +removeTask(taskAndAction.getTaskId()); break; } } @@ -149,7 +152,7 @@ public class DefaultStateUpdater implements StateUpdater { log.error("An unexpected error occurred within the state updater thread: " + runtimeException); final ExceptionAndTasks exceptionAndTasks = new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException); updatingTasks.clear(); -failedTasks.add(exceptionAndTasks); +exceptionsAndFailedTasks.add(exceptionAndTasks); isRunning.set(false); } @@ -164,7 +167,7 @@ public class DefaultStateUpdater implements StateUpdater { } corruptedTasks.add(corruptedTask); } -failedTasks.add(new ExceptionAndTasks(corruptedTasks, taskCorruptedException)); +exceptionsAndFailedTasks.add(new E
[kafka] branch trunk updated: MINOR: Fix typo in Kafka config docs (#12268)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 249cd4461f MINOR: Fix typo in Kafka config docs (#12268) 249cd4461f is described below commit 249cd4461faf003e6baa2327e9ceb130eb80c952 Author: Joel Hamill <11722533+joel-ham...@users.noreply.github.com> AuthorDate: Wed Jun 8 13:51:41 2022 -0700 MINOR: Fix typo in Kafka config docs (#12268) Reviewers: Guozhang Wang --- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f751cc21a6..3d7df18cbb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -980,8 +980,8 @@ object KafkaConfig { val ControllerQuotaWindowSizeSecondsDoc = "The time span of each sample for controller mutations quotas" val ClientQuotaCallbackClassDoc = "The fully qualified name of a class that implements the ClientQuotaCallback interface, " + -"which is used to determine quota limits applied to client requests. By default, user, client-id, user or client-id " + -"quotas stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal " + +"which is used to determine quota limits applied to client requests. By default, the user and client-id " + +"quotas that are stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal " + "of the session and the client-id of the request is applied." val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
[kafka] branch trunk updated: HOTFIX: add space to avoid checkstyle failure
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5d593287c7 HOTFIX: add space to avoid checkstyle failure 5d593287c7 is described below commit 5d593287c7db7030c444a469db72d59b3883d904 Author: Guozhang Wang AuthorDate: Mon Jun 6 11:34:59 2022 -0700 HOTFIX: add space to avoid checkstyle failure --- .../kafka/clients/consumer/internals/ConsumerCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 180053ee22..c65d33176f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -558,7 +558,7 @@ public abstract class ConsumerCoordinatorTest { // should try to find coordinator since we are commit async coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> { -fail("Commit should not get responses, but got offsets:" + offsets +", and exception:" + exception); +fail("Commit should not get responses, but got offsets:" + offsets + ", and exception:" + exception); }); coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown());
[kafka] branch 3.2 updated (90db4f47d6 -> b61edf2037)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git from 90db4f47d6 KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136) add 173b8fd26d HOTFIX: only try to clear discover-coordinator future upon commit (#12244) add b61edf2037 HOTFIX: add space to avoid checkstyle failure No new revisions were added by this update. Summary of changes: .../consumer/internals/ConsumerCoordinator.java| 23 +++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 1 - .../internals/ConsumerCoordinatorTest.java | 57 -- 3 files changed, 73 insertions(+), 8 deletions(-)
[kafka] branch trunk updated: HOTFIX: only try to clear discover-coordinator future upon commit (#12244)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2047fc3715 HOTFIX: only try to clear discover-coordinator future upon commit (#12244) 2047fc3715 is described below commit 2047fc371500286ba3e41d16551d2067edad Author: Guozhang Wang AuthorDate: Mon Jun 6 11:05:41 2022 -0700 HOTFIX: only try to clear discover-coordinator future upon commit (#12244) This is another way of fixing KAFKA-13563 other than #11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen , David Jacot --- .../consumer/internals/ConsumerCoordinator.java| 23 +++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 1 - .../internals/ConsumerCoordinatorTest.java | 57 -- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 51fa0b62ed..b853ff99e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.time.Duration; import java.util.SortedSet; import java.util.TreeSet; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -548,14 +549,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } } else { -// For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. +// For manually assigned partitions, we do not try to pro-actively lookup coordinator; +// instead we only try to refresh metadata when necessary. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. -if (coordinatorUnknownAndUnready(timer)) { -return false; +if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { +client.awaitMetadataUpdate(timer); } + +// if there is pending coordinator requests, ensure they have a chance to be transmitted. +client.pollNoWakeup(); } maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); @@ -1004,7 +1009,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (offsets.isEmpty()) { // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. future = doCommitOffsetsAsync(offsets, callback); -} else if (!coordinatorUnknown()) { +} else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) { +// we need to make sure coordinator is ready before committing, since +// this is for async committing we do not try to block, but just try once to +// clear the previous discover-coordinator future, resend, or get responses; +// if the coordinator is not ready yet then we would just proceed and put that into the +// pending requests, and future poll calls would still try to complete them. +// +// the key here though is that we have to try sending the discover-coordinator if +// it's not known or ready, since this is the only place we can send such request +// under manual as
[kafka] branch trunk updated (9dc332f5ca -> 286bae4251)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 9dc332f5ca KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035) add 286bae4251 KAFKA-10199: Implement adding standby tasks to the state updater (#12200) No new revisions were added by this update. Summary of changes: .../processor/internals/ChangelogReader.java | 9 + .../processor/internals/DefaultStateUpdater.java | 125 +--- .../streams/processor/internals/StateUpdater.java | 12 +- .../internals/DefaultStateUpdaterTest.java | 226 - 4 files changed, 290 insertions(+), 82 deletions(-)
[kafka] branch trunk updated: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9dc332f5ca KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035) 9dc332f5ca is described below commit 9dc332f5ca34b80af369646f767c40c6b189f831 Author: Sayantanu Dey <41713730+sayantanu-...@users.noreply.github.com> AuthorDate: Mon May 23 22:37:19 2022 +0530 KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035) This is for KIP-812: * added leaveGroup on a new close function in kafka stream * added logic to resolve future returned by remove member call in close method * added max check on remainingTime value in close function Reviewers: David Jacot , Guozhang Wang --- .../org/apache/kafka/streams/KafkaStreams.java | 73 .../org/apache/kafka/streams/KafkaStreamsTest.java | 203 + 2 files changed, 276 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 9d9d57f8ab..ed62f89ebc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1349,6 +1349,24 @@ public class KafkaStreams implements AutoCloseable { } } +/** + * Class that handles options passed in case of {@code KafkaStreams} instance scale down + */ +public static class CloseOptions { +private Duration timeout = Duration.ofMillis(Long.MAX_VALUE); +private boolean leaveGroup = false; + +public CloseOptions timeout(final Duration timeout) { +this.timeout = timeout; +return this; +} + +public CloseOptions leaveGroup(final boolean leaveGroup) { +this.leaveGroup = leaveGroup; +return this; +} +} + /** * Shutdown this {@code KafkaStreams} instance by signaling all the threads to stop, and then wait for them to join. * This will block until all threads have stopped. @@ -1498,6 +1516,61 @@ public class KafkaStreams implements AutoCloseable { return close(timeoutMs); } +/** + * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the + * threads to join. + * @param options contains timeout to specify how long to wait for the threads to shutdown, and a flag leaveGroup to + * trigger consumer leave call + * @return {@code true} if all threads were successfully stopped{@code false} if the timeout was reached + * before all threads stopped + * Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}. + * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds} + */ +public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix); +if (timeoutMs < 0) { +throw new IllegalArgumentException("Timeout can't be negative."); +} + +final long startMs = time.milliseconds(); + +final boolean closeStatus = close(timeoutMs); + +final Optional groupInstanceId = clientSupplier + .getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)) +.groupMetadata() +.groupInstanceId(); + +final long remainingTimeMs = Math.max(0, timeoutMs - (time.milliseconds() - startMs)); + +if (options.leaveGroup && groupInstanceId.isPresent()) { +log.debug("Sending leave group trigger to removing instance from consumer group"); +//removing instance from consumer group + +final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get()); + +final Collection membersToRemove = Collections.singletonList(memberToRemove); + +final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient +.removeMembersFromConsumerGroup( + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), +new RemoveMembersFromConsumerGroupOptions(membersToRemove) +); + +try { + removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, Time
[kafka] branch trunk updated: MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f6ba10ef9c MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189) f6ba10ef9c is described below commit f6ba10ef9c2c2d94473efd2fd596b172fcff494a Author: Divij Vaidya AuthorDate: Sat May 21 19:33:44 2022 +0200 MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189) Flaky test as failed in CI https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests/ The test fails because it does not wait for metadata to be propagated across brokers before killing a broker which may lead to it getting stale information. Note that a similar test was done in #12104 for a different test. Reviewers: Kvicii Y, Ziming Deng, Jason Gustafson , Guozhang Wang --- .../kafka/admin/TopicCommandIntegrationTest.scala | 36 +- .../kafka/integration/KafkaServerTestHarness.scala | 6 +++- .../kafka/server/DeleteTopicsRequestTest.scala | 8 ++--- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala index 26c60e1c3e..3082babd06 100644 --- a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala @@ -586,16 +586,10 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = brokers.filterNot(_.config.brokerId == 0) - if (isKRaftTest()) { -TestUtils.ensureConsistentKRaftMetadata( - aliveServers, - controllerServer, - "Timeout waiting for partition metadata propagating to brokers" -) +ensureConsistentKRaftMetadata() } else { -TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) +TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0) } val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions" @@ -618,8 +612,14 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = brokers.filterNot(_.config.brokerId == 0) - TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) + if (isKRaftTest()) { +ensureConsistentKRaftMetadata() + } else { +TestUtils.waitUntilTrue( + () => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 5), + s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic" +) + } val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" val rows = output.split("\n") @@ -697,6 +697,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) killBroker(1) + + if (isKRaftTest()) { +ensureConsistentKRaftMetadata() + } else { +TestUtils.waitUntilTrue( + () => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 4), + s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic" +) + } + val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions" val rows = output.split("\n") @@ -741,13 +751,11 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = brokers.filterNot(_.config.brokerId == 0) - if (isKRaftTest()) { -TestUtils.ensureConsistentKRaftMetadata(aliveServers, controllerServer, "Timeout waiting for topic configs propagating to brokers") +ensureConsistentKRaftMetadata() } else { TestUtils.waitUntilTrue( - () => aliveServers.forall( + () => aliveBrokers.forall( broker => broker.metadataCache.getPartitionInfo(underMinIsrTopic, 0).get.isr().size() < 6 && broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID), diff --git a/core/src/test/scala/unit/kafka/integ
[kafka-site] branch asf-site updated: MINOR: Fix DSL typo in streams docs (#412)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new e3def51b MINOR: Fix DSL typo in streams docs (#412) e3def51b is described below commit e3def51b3d357663e2546d6a5b11c4d6f6144247 Author: Milind Mantri AuthorDate: Wed May 18 22:44:55 2022 +0530 MINOR: Fix DSL typo in streams docs (#412) Reviewers: Guozhang Wang --- 32/streams/developer-guide/dsl-topology-naming.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/32/streams/developer-guide/dsl-topology-naming.html b/32/streams/developer-guide/dsl-topology-naming.html index 9e687f9a..cd11c132 100644 --- a/32/streams/developer-guide/dsl-topology-naming.html +++ b/32/streams/developer-guide/dsl-topology-naming.html @@ -41,7 +41,7 @@ you are required to explicitly name each one. - At the DLS layer, there are operators. A single DSL operator may + At the DSL layer, there are operators. A single DSL operator may compile down to multiple Processors and State Stores, and if required repartition topics. But with the Kafka Streams DSL, all these names are generated for you. There is a relationship between
[kafka] branch trunk updated (467bce04ae4 -> f96e3813876)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 467bce04ae4 MINOR: Update release versions for upgrade tests with 3.1.1 release (#12156) add f96e3813876 KAFKA-13746: Attempt to fix flaky test by waiting on metadata update (#12104) No new revisions were added by this update. Summary of changes: .../kafka/admin/TopicCommandIntegrationTest.scala | 14 +- 1 file changed, 13 insertions(+), 1 deletion(-)
[kafka] branch trunk updated: KAFKA-13785: [8/N][emit final] time-ordered session store (#12127)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3b08deaa76 KAFKA-13785: [8/N][emit final] time-ordered session store (#12127) 3b08deaa76 is described below commit 3b08deaa761c2387a41610893dc8302ab1d97338 Author: Guozhang Wang AuthorDate: Thu May 5 16:09:16 2022 -0700 KAFKA-13785: [8/N][emit final] time-ordered session store (#12127) Time ordered session store implementation. I introduced AbstractRocksDBTimeOrderedSegmentedBytesStore to make it generic for RocksDBTimeOrderedSessionSegmentedBytesStore and RocksDBTimeOrderedSegmentedBytesStore. A few minor follow-up changes: 1. Avoid extra byte array allocation for fixed upper/lower range serialization. 2. Rename some class names to be more consistent. Authored-by: Hao Li <1127478+lihao...@users.noreply.github.com> Reviewers: Guozhang Wang , John Roesler --- ...stractDualSchemaRocksDBSegmentedBytesStore.java | 3 - ...ractRocksDBTimeOrderedSegmentedBytesStore.java} | 111 ++ .../state/internals/PrefixedSessionKeySchemas.java | 387 +++ .../state/internals/PrefixedWindowKeySchemas.java | 6 +- ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 136 +++ .../internals/RocksDBTimeOrderedSessionStore.java | 156 ...ocksDBTimeOrderedWindowSegmentedBytesStore.java | 127 +++ .../internals/RocksDBTimeOrderedWindowStore.java | 4 +- ...IndexedTimeOrderedWindowBytesStoreSupplier.java | 4 +- ...ocksDbTimeOrderedSessionBytesStoreSupplier.java | 69 .../streams/state/internals/SessionKeySchema.java | 40 +- .../internals/WrappedSessionStoreIterator.java | 12 +- ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 411 - .../state/internals/RocksDBSessionStoreTest.java | 68 +++- .../RocksDBTimeOrderedSegmentedBytesStoreTest.java | 74 ...DBTimeOrderedWindowSegmentedBytesStoreTest.java | 121 ++ .../state/internals/RocksDBWindowStoreTest.java| 68 ++-- ...xedTimeOrderedWindowBytesStoreSupplierTest.java | 8 +- .../state/internals/SessionKeySchemaTest.java | 223 --- .../internals/TimeOrderedWindowStoreTest.java | 4 +- .../state/internals/WindowKeySchemaTest.java | 10 +- 21 files changed, 1757 insertions(+), 285 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index b1044eb49c..95c1d8d8c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -50,7 +50,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore segments; -private final String metricScope; protected final KeySchema baseKeySchema; protected final Optional indexKeySchema; @@ -65,12 +64,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore indexKeySchema, final AbstractSegments segments) { this.name = name; -this.metricScope = metricScope; this.baseKeySchema = baseKeySchema; this.indexKeySchema = indexKeySchema; this.segments = segments; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java similarity index 65% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java index e87af877fb..f7216412f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java @@ -16,22 +16,12 @@ */ package org.apache.kafka.streams.state.internals; -import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
[kafka] branch trunk updated: KAFKA-10199: Implement adding active tasks to the state updater (#12128)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ced5989ff6 KAFKA-10199: Implement adding active tasks to the state updater (#12128) ced5989ff6 is described below commit ced5989ff69f8a5e76518fdeb39f41ab20b2574f Author: Bruno Cadonna AuthorDate: Fri May 6 01:00:35 2022 +0200 KAFKA-10199: Implement adding active tasks to the state updater (#12128) This PR adds the default implementation of the state updater. The implementation only implements adding active tasks to the state updater. Reviewers: Guozhang Wang --- .../processor/internals/ChangelogReader.java | 2 + .../processor/internals/DefaultStateUpdater.java | 373 +++ .../streams/processor/internals/StateUpdater.java | 19 +- .../processor/internals/StoreChangelogReader.java | 3 +- .../internals/DefaultStateUpdaterTest.java | 408 + .../processor/internals/MockChangelogReader.java | 5 + 6 files changed, 804 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 9c62dd182e..38b00232c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -46,6 +46,8 @@ public interface ChangelogReader extends ChangelogRegister { */ Set completedChangelogs(); +boolean allChangelogsCompleted(); + /** * Clear all partitions */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java new file mode 100644 index 00..0b6558d8ac --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.State; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +public class DefaultStateUpdater implements StateUpdater { + +private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + +"Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact)."; + +private class StateUpdaterThread extends Thread { + +private final ChangelogReader changelogReader; +private final AtomicBoolean isRunning = new AtomicBoolean(true); +private final java.util.function.Consumer> offsetResetter; +private final Map updatingTasks = new HashMap<>(); +private final Logger log; + +public StateUpdaterThread(final String name, + final ChangelogReader changelogReader, +
[kafka] branch trunk updated: MINOR: Fix typos in configuration docs (#11874)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 18b84d0404 MINOR: Fix typos in configuration docs (#11874) 18b84d0404 is described below commit 18b84d0404d891263420628e12a2ce4f2ac85806 Author: Joel Hamill <11722533+joel-ham...@users.noreply.github.com> AuthorDate: Wed May 4 10:27:14 2022 -0700 MINOR: Fix typos in configuration docs (#11874) Reviewers: Chris Egerton, Weikang Sun, Andrew Eugene Choi, Luke Chen, Guozhang Wang --- .../java/org/apache/kafka/clients/producer/ProducerConfig.java | 9 + .../src/main/java/org/apache/kafka/streams/StreamsConfig.java| 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 6c258933af..2d586f255c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -216,10 +216,11 @@ public class ProducerConfig extends AbstractConfig { /** max.in.flight.requests.per.connection */ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." -+ " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" -+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." -+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." -+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; ++ " Note that if this configuration is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" ++ " message reordering after a failed send due to retries (i.e., if retries are enabled); " ++ " if retries are disabled or if enable.idempotence is set to true, ordering will be preserved." ++ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." ++ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "; /** retries */ public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 23c021c63f..6428f69188 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -498,8 +498,8 @@ public class StreamsConfig extends AbstractConfig { "org.apache.kafka.common.serialization.Serde interface."; public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde"; -private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the \" +\n" + -"\"org.apache.kafka.common.serialization.Serde interface.. Note that setting this config in KafkaStreams application would result " + +private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " + +"org.apache.kafka.common.serialization.Serde interface. Note that setting this config in KafkaStreams application would result " + "in an error as it is meant to be used only from Plain consumer client."; /** {@code default key.serde} */
[kafka] branch trunk updated: HOTFIX: Only measure in nano when producer metadata refresh is required (#12102)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e026384ffb HOTFIX: Only measure in nano when producer metadata refresh is required (#12102) e026384ffb is described below commit e026384ffb3170a2e71053a4163db58b9bd8fba6 Author: Guozhang Wang AuthorDate: Wed Apr 27 11:27:54 2022 -0700 HOTFIX: Only measure in nano when producer metadata refresh is required (#12102) We added the metadata wait time in total blocked time (#11805). But we added it in the critical path of send which is called per-record, whereas metadata refresh only happens rarely. This way the cost of time.nanos becomes unnecessarily significant as we call it twice per record. This PR moves the call to inside the waitOnMetadata callee and only when we do need to wait for a metadata refresh round-trip (i.e. we are indeed blocking). Reviewers: Matthias J. Sax --- .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f36db02a9d..85a3e239e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -931,7 +931,6 @@ public class KafkaProducer implements Producer { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); -long nowNanos = time.nanoseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); @@ -941,7 +940,6 @@ public class KafkaProducer implements Producer { throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; -producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; @@ -1080,6 +1078,7 @@ public class KafkaProducer implements Producer { // Issue metadata requests until we have metadata for the topic and the requested partition, // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. +long nowNanos = time.nanoseconds(); do { if (partition != null) { log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic); @@ -,6 +1110,8 @@ public class KafkaProducer implements Producer { partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null || (partition != null && partition >= partitionsCount)); +producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos); + return new ClusterAndWaitTime(cluster, elapsed); }
[kafka] branch trunk updated: MINOR: fix html generation syntax errors (#12094)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6d7723f073 MINOR: fix html generation syntax errors (#12094) 6d7723f073 is described below commit 6d7723f073ad0b935b02792be9aa4fd49b581b50 Author: Mike Tobola AuthorDate: Tue Apr 26 16:51:12 2022 -0700 MINOR: fix html generation syntax errors (#12094) The html document generation has some errors in it, specifically related to protocols. The two issues identified and resolved are: * Missing closing tags added * Invalid usage of a tag as a wrapper element for elements. Changed the tag to be a . Tested by running ./gradlew siteDocsTar and observing that the output was properly formed. Reviewers: Guozhang Wang --- .../main/java/org/apache/kafka/common/protocol/ApiKeys.java| 2 +- .../src/main/java/org/apache/kafka/common/protocol/Errors.java | 2 +- .../main/java/org/apache/kafka/common/protocol/Protocol.java | 10 +- .../main/java/org/apache/kafka/common/protocol/types/Type.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 399b631d76..628c9407cc 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -241,7 +241,7 @@ public enum ApiKeys { b.append(""); b.append("\n"); } -b.append("\n"); +b.append("\n"); return b.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f48ae6c233..5db9717906 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -500,7 +500,7 @@ public enum Errors { b.append(""); b.append("\n"); } -b.append("\n"); +b.append("\n"); return b.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index d455b26eb2..a75eb0661d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -112,7 +112,7 @@ public class Protocol { b.append(""); b.append("\n"); } -b.append("\n"); +b.append("\n"); } public static String toHtml() { @@ -148,7 +148,7 @@ public class Protocol { Schema schema = requests[i]; // Schema if (schema != null) { -b.append(""); +b.append(""); // Version header b.append(""); b.append(key.name); @@ -159,7 +159,7 @@ public class Protocol { b.append(""); schemaToFieldTableHtml(requests[i], b); } -b.append("\n"); +b.append("\n"); } // Responses @@ -169,7 +169,7 @@ public class Protocol { Schema schema = responses[i]; // Schema if (schema != null) { -b.append(""); +b.append(""); // Version header b.append(""); b.append(key.name); @@ -180,7 +180,7 @@ public class Protocol { b.append(""); schemaToFieldTableHtml(responses[i], b); } -b.append("\n"); +b.append("\n"); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 46a59bd082..4af74dbf4c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -1120,7 +1120,7 @@ public abstract class Type { b.append(""); b.append("\n"); } -b.append("\n"); +b.append("\n"); return b.toString(); }
[kafka] branch trunk updated: MINOR: revert back to 60s session timeout for static membership test (#11881)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f28a2ee918 MINOR: revert back to 60s session timeout for static membership test (#11881) f28a2ee918 is described below commit f28a2ee918423e402ad9ddca9451b973dddfc591 Author: Luke Chen AuthorDate: Fri Apr 22 02:51:31 2022 +0800 MINOR: revert back to 60s session timeout for static membership test (#11881) Reviewers: Guozhang Wang --- tests/kafkatest/services/streams.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index f4f6a6a04f..5dedc57916 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -690,10 +690,9 @@ class StaticMemberTestService(StreamsTestBaseService): streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), streams_property.NUM_THREADS: self.NUM_THREADS, consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID, - consumer_property.SESSION_TIMEOUT_MS: 6, + consumer_property.SESSION_TIMEOUT_MS: 6, # set longer session timeout for static member test 'input.topic': self.INPUT_TOPIC, - "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment - "session.timeout.ms": "1" # set back to 10s for tests. See KIP-735 + "acceptable.recovery.lag": "9223372036854775807" # enable a one-shot assignment }
[kafka] branch trunk updated (d83fccd65f -> c5077c679c)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from d83fccd65f KAFKA-13785: [5/N][emit final] cache for time ordered window store (#12030) add c5077c679c KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names (#11703) No new revisions were added by this update. Summary of changes: .../internals/InternalTopologyBuilder.java | 24 --- .../processor/internals/ProcessorContextUtils.java | 26 +--- .../state/internals/CachingWindowStore.java| 20 ++-- .../InMemoryTimeOrderedKeyValueBuffer.java | 12 +++--- .../state/internals/MeteredKeyValueStore.java | 28 -- .../state/internals/MeteredSessionStore.java | 28 -- .../state/internals/MeteredWindowStore.java| 24 --- 7 files changed, 49 insertions(+), 113 deletions(-)
[kafka] branch trunk updated (fb66b3668b -> d83fccd65f)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from fb66b3668b KAFKA-13799: Improve documentation for Kafka zero-copy (#12052) add d83fccd65f KAFKA-13785: [5/N][emit final] cache for time ordered window store (#12030) No new revisions were added by this update. Summary of changes: ...stractDualSchemaRocksDBSegmentedBytesStore.java | 6 +- .../MergedSortedCacheWindowStoreIterator.java | 14 +- ...rgedSortedCacheWindowStoreKeyValueIterator.java | 33 +- .../state/internals/PrefixedWindowKeySchemas.java | 20 ++ .../internals/RocksDBTimeOrderedWindowStore.java | 7 +- ...ore.java => TimeOrderedCachingWindowStore.java} | 390 + .../internals/TimestampedWindowStoreBuilder.java | 19 + ...dSortedCacheWrappedWindowStoreIteratorTest.java | 105 -- ...acheWrappedWindowStoreKeyValueIteratorTest.java | 83 - ...meOrderedCachingPersistentWindowStoreTest.java} | 238 ++--- .../TimestampedWindowStoreBuilderTest.java | 46 ++- 11 files changed, 726 insertions(+), 235 deletions(-) copy streams/src/main/java/org/apache/kafka/streams/state/internals/{CachingWindowStore.java => TimeOrderedCachingWindowStore.java} (56%) copy streams/src/test/java/org/apache/kafka/streams/state/internals/{CachingPersistentWindowStoreTest.java => TimeOrderedCachingPersistentWindowStoreTest.java} (85%)
[kafka] branch trunk updated: KAFKA-13799: Improve documentation for Kafka zero-copy (#12052)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fb66b3668b KAFKA-13799: Improve documentation for Kafka zero-copy (#12052) fb66b3668b is described below commit fb66b3668bd0a257a77ca43f7b1a31e8180f21f8 Author: RivenSun <91005273+rivens...@users.noreply.github.com> AuthorDate: Thu Apr 21 01:31:32 2022 +0800 KAFKA-13799: Improve documentation for Kafka zero-copy (#12052) Improve documentation for Kafka zero-copy. Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer. Reviewers: Divij Vaidya , Guozhang Wang --- docs/design.html | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/design.html b/docs/design.html index db71b65245..15eefa8132 100644 --- a/docs/design.html +++ b/docs/design.html @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling +SSL configuration, refer to security.protocol and security.inter.broker.protocol + For more background on the sendfile and zero-copy support in Java, see this https://developer.ibm.com/articles/j-zerocopy/;>article. End-to-end Batch Compression
[kafka] branch trunk updated: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (#12041)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cf5e714a8b MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (#12041) cf5e714a8b is described below commit cf5e714a8bea8bb1de75201d0769bb1c246b9334 Author: RivenSun <91005273+rivens...@users.noreply.github.com> AuthorDate: Fri Apr 15 08:30:43 2022 +0800 MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (#12041) Following PR #11940, ignore unused config when ConsumerCoordinator is not constructed. Reviewers: Guozhang Wang --- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java| 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index a49c89560f..6ffb772915 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -774,8 +774,12 @@ public class KafkaConsumer implements Consumer { ); // no coordinator will be constructed for the default (null) group id -this.coordinator = !groupId.isPresent() ? null : -new ConsumerCoordinator(groupRebalanceConfig, +if (!groupId.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); + config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +this.coordinator = null; +} else { +this.coordinator = new ConsumerCoordinator(groupRebalanceConfig, logContext, this.client, assignors, @@ -788,6 +792,7 @@ public class KafkaConsumer implements Consumer { config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} this.fetcher = new Fetcher<>( logContext, this.client,
[kafka] branch trunk updated (a3a4323a5a -> f49cff412d)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from a3a4323a5a MINOR: Update LICENSE-binary (#12051) add f49cff412d MINOR: Remove redundant conditional judgments in Selector.clear() (#12048) No new revisions were added by this update. Summary of changes: clients/src/main/java/org/apache/kafka/common/network/Selector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch trunk updated: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1df232c839 MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985) 1df232c839 is described below commit 1df232c839f4568718a52c04aad72b69beb52026 Author: RivenSun <91005273+rivens...@users.noreply.github.com> AuthorDate: Wed Apr 13 12:24:57 2022 +0800 MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985) Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException. Reviewers: Mickael Maison , Guozhang Wang --- .../org/apache/kafka/clients/producer/ProducerConfig.java | 4 +++- .../java/org/apache/kafka/common/record/CompressionType.java | 6 ++ .../org/apache/kafka/clients/producer/ProducerConfigTest.java | 11 +++ core/src/main/scala/kafka/server/KafkaConfig.scala| 6 +++--- core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 2 +- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index afc1e55cdf..8fec07a297 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig { in("all", "-1", "0", "1"), Importance.LOW, ACKS_DOC) -.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) +.define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 1b9754ffab..c526929b72 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -190,4 +190,10 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } + +@Override +public String toString() { +return name; +} + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java index a2f318bebc..ae9de7b70a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java @@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class ProducerConfigTest { @@ -59,4 +61,13 @@ public class ProducerConfigTest { assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass); assertEquals(newConfigs.get(Pr
[kafka-site] branch asf-site updated: MINOR: Unsubscribe mailing lists (#402)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new c9f686f MINOR: Unsubscribe mailing lists (#402) c9f686f is described below commit c9f686f14097babaf9d27ad47c4ed7d8d42446d5 Author: Tom Bentley AuthorDate: Sat Apr 2 00:01:22 2022 +0100 MINOR: Unsubscribe mailing lists (#402) Reviewers: Guozhang Wang --- contact.html | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/contact.html b/contact.html index 270e2d6..81daaba 100644 --- a/contact.html +++ b/contact.html @@ -13,23 +13,23 @@ User mailing list: A list for general user questions about Kafka. To subscribe, send an email to mailto:users-subscr...@kafka.apache.org;>users-subscr...@kafka.apache.org. Once subscribed, you can ask general user questions by mailing to mailto:us...@kafka.apache.org;>us...@kafka.apache.org. Archives are available https://lists.apache.org/list.html?us...@kafka.apache.org;>here. + To unsubscribe, send an email to mailto:users-unsubscr...@kafka.apache.org;>users-unsubscr...@kafka.apache.org. Developer mailing list: A list for discussion on Kafka development. To subscribe, send an email to mailto:dev-subscr...@kafka.apache.org;>dev-subscr...@kafka.apache.org. Once subscribed, you can have discussion on Kafka development by mailing to mailto:d...@kafka.apache.org;>d...@kafka.apache.org. Archives are available https://lists.apache.org/list.html?d...@kafka.apache.org;>here. + To unsubscribe, send an email to mailto:dev-unsubscr...@kafka.apache.org;>dev-unsubscr...@kafka.apache.org. JIRA mailing list: A list to track Kafka https://issues.apache.org/jira/projects/KAFKA;>JIRA notifications. To subscribe, send an email to mailto:jira-subscr...@kafka.apache.org;>jira-subscr...@kafka.apache.org. Archives are available https://lists.apache.org/list.html?j...@kafka.apache.org;>here. + To unsubscribe, send an email to mailto:jira-unsubscr...@kafka.apache.org;>jira-unsubscr...@kafka.apache.org. Commit mailing list: A list to track Kafka commits. To subscribe, send an email to mailto:commits-subscr...@kafka.apache.org;>commits-subscr...@kafka.apache.org. Archives are available http://mail-archives.apache.org/mod_mbox/kafka-commits;>here. + To unsubscribe, send an email to mailto:commits-unsubscr...@kafka.apache.org;>commits-unsubscr...@kafka.apache.org. - To unsubscribe from any of these you just change the word "subscribe" to "unsubscribe" in the email adresses above. - - - Prior to the move to Apache we had a Google group we used for discussion. Archives can be found http://groups.google.com/group/kafka-dev;>here. After that we were in Apache Incubator which has its own archives for http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/;>user, http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/;>dev, and http://mail-archives.apache.org/mod_mbox/incubator-kafka-commits/;>commit lists.
[kafka] branch trunk updated (2a27059 -> 19a6269)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2a27059 MINOR: Improved display names for parameterized KRaft and ZK tests (#11957) add 19a6269 MINOR: Fix log4j entry in RepartitionTopics (#11958) No new revisions were added by this update. Summary of changes: .../kafka/streams/processor/internals/RepartitionTopics.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-)