Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1348641063 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -568,27 +621,26 @@ class RemoteIndexCacheTest { } private def verifyFetchIndexInvocation(count: Int, - indexTypes: Seq[IndexType] = - Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = { + indexTypes: Seq[IndexType]): Unit = { for (indexType <- indexTypes) { verify(rsm, times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType)) } } private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex = { -val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata) +val txnIdxFile = remoteTransactionIndexFile(new File(tpDir, DIR_NAME), metadata) txnIdxFile.createNewFile() new TransactionIndex(metadata.startOffset(), txnIdxFile) } private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] -new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12) +new TimeIndex(remoteTimeIndexFile(new File(tpDir, DIR_NAME), metadata), metadata.startOffset(), maxEntries * 12) Review Comment: @jeel2420 There are three functions which is used to create Indexes on tpDir storage. ``` private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): OffsetIndex private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex ``` tpDir storage in the current unit test act as a data place fetched from remote storage. The cases where tpDir has been used instead of remoteCacheDir is only when you are creating the spyEntry. But if you see the test cases where it has happened , they are just testing the cache functionality and not mixing it with the remote storage manager functionality. To support the use cases mentioned above you can do this step **Suggestion:** Change the signature of the below method to take Directory path also has an parameter , and refactor the existing test case ``` private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata,dir:File): TimeIndex private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata,dir:File): OffsetIndex private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata,dir:File):TransactionIndex ``` Then you can pass the appropriate directory required for your test case. cc @showuon @divijvaidya ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -193,7 +192,16 @@ public File cacheDir() { public void remove(Uuid key) { lock.readLock().lock(); try { -internalCache.invalidate(key); +internalCache.asMap().computeIfPresent(key, (k, v) -> { +try { +v.markForCleanup(); +expiredIndexes.put(v); Review Comment: @showuon @jeel2420 Should not we use expiredIndxes.offer instead of expiredIndexes.put() as put will block the operation if queue size is full ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
showuon commented on PR #14483: URL: https://github.com/apache/kafka/pull/14483#issuecomment-1762593153 @jeel2420 , there's a test failed due to this change, please take a look. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14483/10/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]
guozhangwang commented on PR #14157: URL: https://github.com/apache/kafka/pull/14157#issuecomment-1762506459 > @guozhangwang -- yes, still looking for input on this. Don't need a full review, just some input if we think it's the right approach to begin with (cf Sophie's justified comment, that is will make it a little bit more "messy") Ack, will take a look asap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: cleanup some compiler warnings in Kafka Streams examples [kafka]
mjsax merged PR #14547: URL: https://github.com/apache/kafka/pull/14547 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Only commit running active and standby tasks when tasks corrupted [kafka]
guozhangwang commented on code in PR #14508: URL: https://github.com/apache/kafka/pull/14508#discussion_r1359038910 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -223,10 +223,7 @@ boolean handleCorruption(final Set corruptedTasks) { final Collection tasksToCommit = allTasks() .values() .stream() -// TODO: once we remove state restoration from the stream thread, we can also remove -// the RESTORING state here, since there will not be any restoring tasks managed -// by the stream thread anymore. -.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) +.filter(t -> t.state() == Task.State.RUNNING) Review Comment: Hey @cadonna sorry I came late on this PR. One thing I'd like to raise is that in the past, we've seen active task restoring never complete under rolling restart / rebalance storm scenarios since we kept losing the progress we made thus far when reviving. I'm not 100% sure if this part of the code is related to that scenario but just try to double check. If you have thought about it and concluded this would not be related, I'm relieved :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on PR #14417: URL: https://github.com/apache/kafka/pull/14417#issuecomment-1762463861 @jolshan should we still keep them as sensors or do we want to remove them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15605 Handle pending topic deletions during migration [kafka]
cmccabe commented on PR #14545: URL: https://github.com/apache/kafka/pull/14545#issuecomment-1762432231 Hmm, this isn’t how we agreed to handle this. We were going to get rid of deleting state in hybrid brokers as I recall. Then we were going to actually verify that broker topic IDs matched what was sent in LeaderAndIsr, and move aside or delete if not. In the initial full `leaderAndIsrRequest`, we can use `LeaderAndIsrRequest.Type` = All to indicate that the list is exhaustive, and move aside topics that don't appear in there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: cleanup some warning in Kafka Streams examples [kafka]
mjsax opened a new pull request, #14547: URL: https://github.com/apache/kafka/pull/14547 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990797 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -66,31 +82,107 @@ boolean isEmpty() { * @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise */ boolean hasCompletedFetches(Predicate predicate) { -return completedFetches.stream().anyMatch(predicate); +try { +lock.lock(); +return completedFetches.stream().anyMatch(predicate); +} finally { +lock.unlock(); +} } void add(CompletedFetch completedFetch) { -completedFetches.add(completedFetch); +try { +lock.lock(); +completedFetches.add(completedFetch); +notEmptyCondition.signalAll(); +} finally { +lock.unlock(); +} } void addAll(Collection completedFetches) { -this.completedFetches.addAll(completedFetches); +if (completedFetches == null || completedFetches.isEmpty()) +return; + +try { +lock.lock(); +this.completedFetches.addAll(completedFetches); +notEmptyCondition.signalAll(); +} finally { +lock.unlock(); +} } CompletedFetch nextInLineFetch() { -return nextInLineFetch; +try { +lock.lock(); +return nextInLineFetch; +} finally { +lock.unlock(); +} } -void setNextInLineFetch(CompletedFetch completedFetch) { -this.nextInLineFetch = completedFetch; +void setNextInLineFetch(CompletedFetch nextInLineFetch) { +try { +lock.lock(); +this.nextInLineFetch = nextInLineFetch; +} finally { +lock.unlock(); +} } CompletedFetch peek() { -return completedFetches.peek(); +try { +lock.lock(); +return completedFetches.peek(); +} finally { +lock.unlock(); +} } CompletedFetch poll() { -return completedFetches.poll(); +try { +lock.lock(); +return completedFetches.poll(); +} finally { +lock.unlock(); +} +} + +/** + * Allows the caller to await presence of data in the buffer. The method will block, returning only + * under one of the following conditions: + * + * + * The buffer was already non-empty on entry + * The buffer was populated during the wait + * The remaining time on the {@link Timer timer} elapsed + * The thread was interrupted + * + * + * @param timer Timer that provides time to wait + */ +void awaitNotEmpty(Timer timer) { +try { +lock.lock(); + +while (isEmpty()) { +// Update the timer before we head into the loop in case it took a while to get the lock. +timer.update(); + +if (timer.isExpired()) +break; + +if (notEmptyCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) { Review Comment: Thanks for catching that. I fixed this but it's pretty clear now that I need unit tests to validate correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990617 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -621,56 +825,163 @@ public void assign(Collection partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } -// TODO: implementation of refactored Fetcher will be included in forthcoming commits. -// fetcher.clearBufferedDataForUnassignedPartitions(partitions); +// Clear the buffered data which are not a part of newly assigned topics +final Set currentTopicPartitions = new HashSet<>(); + +for (TopicPartition tp : subscriptions.assignedPartitions()) { +if (partitions.contains(tp)) +currentTopicPartitions.add(tp); +} + +fetchBuffer.retainAll(currentTopicPartitions); // assignment change event will trigger autocommit if it is configured and the group id is specified. This is // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will -// be no following rebalance -eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); +// be no following rebalance. +// +// See the ApplicationEventProcessor.process() method that handles this event for more detail. +applicationEventHandler.add(new AssignmentChangeApplicationEvent(subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); -if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) -eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); +if (subscriptions.assignFromUser(new HashSet<>(partitions))) +applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override -public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { -throw new KafkaException("method not implemented"); +public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +maybeThrowInvalidGroupIdException(); +if (pattern == null || pattern.toString().isEmpty()) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? +"null" : "empty")); + +throwIfNoAssignorsConfigured(); +log.info("Subscribed to pattern: '{}'", pattern); +subscriptions.subscribe(pattern, listener); +updatePatternSubscription(metadata.fetch()); +metadata.requestUpdateForNewTopics(); +} + +/** + * TODO: remove this when we implement the KIP-848 protocol. + * + * + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ +private void updatePatternSubscription(Cluster cluster) { +final Set topicsToSubscribe = cluster.topics().stream() +.filter(subscriptions::matchesSubscribedPattern) +.collect(Collectors.toSet()); +if (subscriptions.subscribeFromPattern(topicsToSubscribe)) +metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { -throw new KafkaException("method not implemented"); +subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +fetchBuffer.retainAll(Collections.emptySet()); +subscriptions.unsubscribe(); } @Override @Deprecated -public ConsumerRecords poll(long timeout) { -throw new KafkaException("method not implemented"); +public ConsumerRecords poll(final long timeoutMs) { +return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } -private static ClusterResourceListeners configureClusterResourceListeners( -final Deserializer keyDeserializer, -final Deserializer valueDeserializer, -final List... candidateLists) { -ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); -for (List candidateList: candidateLists) -clusterResourceListeners.maybeAddAll(candidateList); +/** + * Send the requests for fetch data to the {@link ConsumerNetworkThread network thread} and set up to + *
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990537 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -621,56 +825,163 @@ public void assign(Collection partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } -// TODO: implementation of refactored Fetcher will be included in forthcoming commits. -// fetcher.clearBufferedDataForUnassignedPartitions(partitions); +// Clear the buffered data which are not a part of newly assigned topics +final Set currentTopicPartitions = new HashSet<>(); + +for (TopicPartition tp : subscriptions.assignedPartitions()) { +if (partitions.contains(tp)) +currentTopicPartitions.add(tp); +} + +fetchBuffer.retainAll(currentTopicPartitions); // assignment change event will trigger autocommit if it is configured and the group id is specified. This is // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will -// be no following rebalance -eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); +// be no following rebalance. +// +// See the ApplicationEventProcessor.process() method that handles this event for more detail. +applicationEventHandler.add(new AssignmentChangeApplicationEvent(subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); -if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) -eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); +if (subscriptions.assignFromUser(new HashSet<>(partitions))) +applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override -public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { -throw new KafkaException("method not implemented"); +public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +maybeThrowInvalidGroupIdException(); +if (pattern == null || pattern.toString().isEmpty()) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? +"null" : "empty")); + +throwIfNoAssignorsConfigured(); +log.info("Subscribed to pattern: '{}'", pattern); +subscriptions.subscribe(pattern, listener); +updatePatternSubscription(metadata.fetch()); +metadata.requestUpdateForNewTopics(); +} + +/** + * TODO: remove this when we implement the KIP-848 protocol. + * + * + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ +private void updatePatternSubscription(Cluster cluster) { +final Set topicsToSubscribe = cluster.topics().stream() +.filter(subscriptions::matchesSubscribedPattern) +.collect(Collectors.toSet()); +if (subscriptions.subscribeFromPattern(topicsToSubscribe)) +metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { -throw new KafkaException("method not implemented"); +subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +fetchBuffer.retainAll(Collections.emptySet()); +subscriptions.unsubscribe(); } @Override @Deprecated -public ConsumerRecords poll(long timeout) { -throw new KafkaException("method not implemented"); +public ConsumerRecords poll(final long timeoutMs) { +return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } -private static ClusterResourceListeners configureClusterResourceListeners( -final Deserializer keyDeserializer, -final Deserializer valueDeserializer, -final List... candidateLists) { -ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); -for (List candidateList: candidateLists) -clusterResourceListeners.maybeAddAll(candidateList); +/** + * Send the requests for fetch data to the {@link ConsumerNetworkThread network thread} and set up to + *
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -17,146 +17,195 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; -public class ApplicationEventProcessor { - -private final BlockingQueue backgroundEventQueue; +/** + * An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread} + * which processes {@link ApplicationEvent application events} generated by the application thread. + */ +public class ApplicationEventProcessor extends EventProcessor { private final ConsumerMetadata metadata; - private final RequestManagers requestManagers; -public ApplicationEventProcessor(final BlockingQueue backgroundEventQueue, +public ApplicationEventProcessor(final LogContext logContext, + final BlockingQueue applicationEventQueue, final RequestManagers requestManagers, final ConsumerMetadata metadata) { -this.backgroundEventQueue = backgroundEventQueue; +super(logContext, applicationEventQueue); this.requestManagers = requestManagers; this.metadata = metadata; } -public boolean process(final ApplicationEvent event) { -Objects.requireNonNull(event); +/** + * Process the events—if any—that were produced by the application thread. It is possible that when processing + * an event generates an error. In such cases, the processor will immediately throw an exception, and not + * process the remaining events. + */ +@Override +public void process() { +process(error -> { +throw error; +}); +} + +@Override +public void process(ApplicationEvent event) { switch (event.type()) { -case NOOP: -return process((NoopApplicationEvent) event); case COMMIT: -return process((CommitApplicationEvent) event); +process((CommitApplicationEvent) event); +return; + case POLL: -return process((PollApplicationEvent) event); +process((PollApplicationEvent) event); +return; + case FETCH_COMMITTED_OFFSET: -return process((OffsetFetchApplicationEvent) event); +process((OffsetFetchApplicationEvent) event); +return; + case METADATA_UPDATE: -return process((NewTopicsMetadataUpdateRequestEvent) event); +process((NewTopicsMetadataUpdateRequestEvent) event); +return; + case ASSIGNMENT_CHANGE: -return process((AssignmentChangeApplicationEvent) event); +process((AssignmentChangeApplicationEvent) event); +return; + case TOPIC_METADATA: -return process((TopicMetadataApplicationEvent) event); +process((TopicMetadataApplicationEvent) event); +return; + case LIST_OFFSETS: -return process((ListOffsetsApplicationEvent) event); +process((ListOffsetsApplicationEvent) event); +return; + +case FETCH: +process((FetchEvent) event); +return; + case RESET_POSITIONS: -return processResetPositionsEvent(); +processResetPositionsEvent(); +return; + case VALIDATE_POSITIONS: -return processValidatePositionsEvent(); +processValidatePositionsEvent(); +return; + +default: +throw new IllegalArgumentException("Application event type " + event.type() + " was not expected"); } -return false; } -/** - * Processes {@link NoopApplicationEvent} and enqueue a - * {@link
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358957417 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358956804 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358956462 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358956176 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358955393 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358954499 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358953536 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358953131 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Description: As part of the review for [FetchRequestManager pull request|https://github.com/apache/kafka/pull/14406], [~junrao] had some questions related to the correctness and clarity of the {{FetcherTest.testCompletedFetchRemoval()}} test: Questions: * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 was: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made two comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote} Is [the check for the records size of 3] redundant given the test [two lines above]?[1] {quote} And also: {quote} Hmm, why don't we return records from other partitions since maxRecords is maxInt?[2] {quote} References: * [1] https://github.com/apache/kafka/pull/14406#discussion_r1347908197 * [2] https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() > > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > As part of the review for [FetchRequestManager pull > request|https://github.com/apache/kafka/pull/14406], [~junrao] had some > questions related to the correctness and clarity of the > {{FetcherTest.testCompletedFetchRemoval()}} test: > Questions: > * https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * https://github.com/apache/kafka/pull/14406#discussion_r1347910980 > * https://github.com/apache/kafka/pull/14406#discussion_r1347913781 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358953070 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Description: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made two comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote} Is [the check for the records size of 3] redundant given the test [two lines above]?[1] {quote} And also: {quote} Hmm, why don't we return records from other partitions since maxRecords is maxInt?[2] {quote} References: * [1] https://github.com/apache/kafka/pull/14406#discussion_r1347908197 * [2] https://github.com/apache/kafka/pull/14406#discussion_r1347910980 was: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the records size of 3] redundant given the test [two lines above]? {quote} > Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() > > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > As part of the review for [pull request > #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made two > comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: > {quote} > Is [the check for the records size of 3] redundant given the test [two lines > above]?[1] > {quote} > And also: > {quote} > Hmm, why don't we return records from other partitions since maxRecords is > maxInt?[2] > {quote} > References: > * [1] https://github.com/apache/kafka/pull/14406#discussion_r1347908197 > * [2] https://github.com/apache/kafka/pull/14406#discussion_r1347910980 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Description: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the records size of 3] redundant given the test [two lines above]? {quote} was: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the test [two lines above]? {quote} > Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() > > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > As part of the review for [pull request > #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these > comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: > {quote}Hmm, why don't we return records from other partitions since > maxRecords is maxInt? > {quote} > and: > {quote}Is [the check for the records size of 3] redundant given the test [two > lines above]? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Description: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote} Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote} Is [the check for the \{{fetchedRecord}} size of 3] redundant given the test [two lines above]? {quote} was: As part of the review for [pull request #14406|[https://github.com/apache/kafka/pull/14406]], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the test [two lines above]? {quote} > Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() > > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > As part of the review for [pull request > #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these > comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: > {quote} > Hmm, why don't we return records from other partitions since maxRecords is > maxInt? > {quote} > and: > {quote} > Is [the check for the \{{fetchedRecord}} size of 3] redundant given the test > [two lines above]? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Description: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the test [two lines above]? {quote} was: As part of the review for [pull request #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote} Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote} Is [the check for the \{{fetchedRecord}} size of 3] redundant given the test [two lines above]? {quote} > Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() > > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > As part of the review for [pull request > #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these > comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: > {quote}Hmm, why don't we return records from other partitions since > maxRecords is maxInt? > {quote} > and: > {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the > test [two lines above]? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
[ https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15606: -- Description: As part of the review for [pull request #14406|[https://github.com/apache/kafka/pull/14406]], [~junrao] made these comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the test [two lines above]? {quote} was: As part of the review for #14406, [~junrao] made these comments on the FetcherTest.testCompletedFetchRemoval() test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the `fetchedRecord` size of 3] redundant given the test [two lines above]? {quote} > Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() > > > Key: KAFKA-15606 > URL: https://issues.apache.org/jira/browse/KAFKA-15606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > As part of the review for [pull request > #14406|[https://github.com/apache/kafka/pull/14406]], [~junrao] made these > comments on the {{FetcherTest.testCompletedFetchRemoval()}} test: > {quote}Hmm, why don't we return records from other partitions since > maxRecords is maxInt? > {quote} > and: > {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the > test [two lines above]? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]
ocadaruma commented on code in PR #14546: URL: https://github.com/apache/kafka/pull/14546#discussion_r1358951019 ## docs/ops.html: ## @@ -3736,7 +3736,7 @@ ZooKeeper to KRaft Migration -ZooKeeper to KRaft migration is considered an Early Access feature and is not recommended for production clusters. +Limitations: The following features are not yet supported for ZK to KRaft migrations: Review Comment: I also wonder if `Downgrading to ZooKeeper` is not possible yet or already supported. Assuming all features of [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) are covered, rollback is possible until we exit migration mode right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
Kirk True created KAFKA-15606: - Summary: Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval() Key: KAFKA-15606 URL: https://issues.apache.org/jira/browse/KAFKA-15606 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Kirk True Assignee: Kirk True As part of the review for #14406, [~junrao] made these comments on the FetcherTest.testCompletedFetchRemoval() test: {quote}Hmm, why don't we return records from other partitions since maxRecords is maxInt? {quote} and: {quote}Is [the check for the `fetchedRecord` size of 3] redundant given the test [two lines above]? {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Zk to KRaft migration is now production ready [kafka]
ocadaruma opened a new pull request, #14546: URL: https://github.com/apache/kafka/pull/14546 - As of 3.6.0, ZK to KRaft migration is considered production ready so updating the doc * https://kafka.apache.org/blog#apache_kafka_360_release_announcement ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358949767 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775087#comment-17775087 ] Philip Nee commented on KAFKA-15602: Hi Luke, i think we need to do two things here. One is probably revert the change and patch it with better documentation and tests. Second is optional, if you want to introduce a separated serializer to handle offset, I think we might need to write a KIP. This is a long term solution. Are you interested in helping out to resolve this issue? I can help reviewing the code and KIP. > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Priority: Critical > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e.g., to be passed into a > producer.send() call). E.g., > {code:java} > ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH); > ... // some sequence of > bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH > ... > bb.flip(); /* logically, this says "I am done writing, let's set this up for > reading"; pragmatically, it sets the limit to the current position so that > whoever reads the buffer knows when to stop reading, and sets the position to > zero so it knows where to start reading
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358918492 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358914547 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358914146 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Commented] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.3
[ https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775070#comment-17775070 ] Said BOUDJELDA commented on KAFKA-15208: This JIRA is updated to mention an upgrade to version 1.15.3 that comes to bring more bug fixing and enhancements Pleass to the last release notes https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3 for more information > Upgrade Jackson dependencies to version 2.15.3 > -- > > Key: KAFKA-15208 > URL: https://issues.apache.org/jira/browse/KAFKA-15208 > Project: Kafka > Issue Type: Improvement > Components: connect, kraft >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Major > Labels: dependencies > Fix For: 3.7.0 > > > Upgrading the version of Jackson dependencies to the latest stable version > 2.15.2 can bring much bug fixing security issues solving and performance > improvement > > Check release notes back to the current version > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.3
[ https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Said BOUDJELDA updated KAFKA-15208: --- Description: Upgrading the version of Jackson dependencies to the latest stable version 2.15.2 can bring much bug fixing security issues solving and performance improvement Check release notes back to the current version [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3] [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] was: Upgrading the version of Jackson dependencies to the latest stable version 2.15.2 can bring much bug fixing security issues solving and performance improvement Check release notes back to the current version [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2] [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] > Upgrade Jackson dependencies to version 2.15.3 > -- > > Key: KAFKA-15208 > URL: https://issues.apache.org/jira/browse/KAFKA-15208 > Project: Kafka > Issue Type: Improvement > Components: connect, kraft >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Major > Labels: dependencies > Fix For: 3.7.0 > > > Upgrading the version of Jackson dependencies to the latest stable version > 2.15.2 can bring much bug fixing security issues solving and performance > improvement > > Check release notes back to the current version > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.3
[ https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Said BOUDJELDA updated KAFKA-15208: --- Summary: Upgrade Jackson dependencies to version 2.15.3 (was: Upgrade Jackson dependencies to version 2.15.2) > Upgrade Jackson dependencies to version 2.15.3 > -- > > Key: KAFKA-15208 > URL: https://issues.apache.org/jira/browse/KAFKA-15208 > Project: Kafka > Issue Type: Improvement > Components: connect, kraft >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Major > Labels: dependencies > Fix For: 3.7.0 > > > Upgrading the version of Jackson dependencies to the latest stable version > 2.15.2 can bring much bug fixing security issues solving and performance > improvement > > Check release notes back to the current version > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.15.3 [kafka]
bmscomp commented on PR #13662: URL: https://github.com/apache/kafka/pull/13662#issuecomment-1762253746 The pull request is updated to take advantages of version 2.15.3 of `jackson databind ` that aims to fix much more issues and bring more enhancements, as we can mention : [#3968](https://github.com/FasterXML/jackson-databind/issues/3968): Records with additional constructors failed to deserialize [#4121](https://github.com/FasterXML/jackson-databind/issues/4121): Preserve the original component type in merging to an array [#426](https://github.com/FasterXML/jackson-dataformats-text/issues/426): Update to SnakeYAML 2.1 Nothing changed related to the limitations make on this version of the field serialisation of json to solve security issues To know more about this version please refer to the release notes https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3 @divijvaidya Please can you take a look at this update Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775069#comment-17775069 ] Luke Kirby commented on KAFKA-15602: {quote}The original intent of the PR was to make the serializer compatible with user offsets; I don't see any test that covers this, am I missing anything? {quote} Yep, no test that covers it. I wrote up the kind of test you'd use to assess it above and it fails. I think the actual intent of the PR writer was to handle the case that they noticed (incorrectly, I contend) and the offsets question just kind of got accidentally ignored. That said, I'm more concerned about the breaking behavior change more than the specifics of handling offsets, which neither version has ever done properly. {quote}We are talking about creating a separated serializer that handles the user offset aren't we? It seems to be the only way to handle this case. {quote} Yeah, I think so. Again, though, I think something has to be done to advertise to users that the contract here has changed and that upgrades to 3.4 will more likely than not result in previously functional producers emitting 0-length messages if anything other than a ByteBuffer.wrap(byte[]) message is provided to send(), and that removing the existing class may be the only way to really achieve that safely – i.e., break loudly. Totally not my call though! > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Priority: Critical > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775065#comment-17775065 ] Philip Nee commented on KAFKA-15602: The original intent of the PR was to make the serializer compatible with user offsets; I don't see any test that covers this, am I missing anything? replace the currently named ByteBufferSerializer with a new one that respects the standard interface unambiguously - We are talking about creating a separated serializer that handles the user offset aren't we? It seems to be the only way to handle this case. > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Priority: Critical > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e.g., to be passed into a > producer.send() call). E.g., > {code:java} > ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH); > ... // some sequence of > bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH > ... > bb.flip(); /* logically, this says "I am done writing, let's set this up for > reading"; pragmatically, it sets the limit to the current position so that > whoever reads the buffer knows when to stop reading, and sets the position to > zero so it
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
splett2 commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358837845 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel, topicPartition, status.error.exceptionName)) } + +if (request.header.apiVersion >= 10) { + status.currentLeader = { +status.error match { + case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH => Review Comment: produce requests do not include a leader epoch => they can never get fenced leader epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
splett2 commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358837039 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +val (leaderId, leaderEpoch) = partitionInfoOrError match { + case Right(x) => +(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch) + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +metadataCache.getPartitionInfo(tp.topic, tp.partition) match { + case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch()) + case None => (-1, -1) +} +} +val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName).getOrElse({ Review Comment: my point in the previous comment is that will never be the case with KRaft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358826931 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel, topicPartition, status.error.exceptionName)) } + +if (request.header.apiVersion >= 10) { + status.currentLeader = { +status.error match { + case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH => Review Comment: ok, removed FENCED_LEADER_EPOCH from the produce path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358796082 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel, topicPartition, status.error.exceptionName)) } + +if (request.header.apiVersion >= 10) { + status.currentLeader = { +status.error match { + case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH => Review Comment: I think the error is only returned on fetch requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Identify clean shutdown broker [kafka]
junrao commented on code in PR #14465: URL: https://github.com/apache/kafka/pull/14465#discussion_r1358649732 ## core/src/main/scala/kafka/log/CleanShutdownFileHandler.scala: ## @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import kafka.utils.Logging + +import java.io.File +import java.io.FileOutputStream +import java.io.BufferedWriter +import java.io.IOException +import java.io.OutputStreamWriter +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.util.regex.Pattern + +/** + * Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher. + * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be + * avoided by passing in the recovery point, however finding the correct position to do this + * requires accessing the offset index which may not be safe in an unclean shutdown. + * For more information see the discussion in PR#2104 + * + * Also, the clean shutdown file can also store the broker epoch, this can be used in the broker registration to + * demonstrate the last reboot is a clean shutdown. (KIP-966) + */ + +object CleanShutdownFileHandler { + val CleanShutdownFileName = ".kafka_cleanshutdown" +} + +class CleanShutdownFileHandler(dirPath: String) extends Logging { + val cleanShutdownFile = new File(dirPath, CleanShutdownFileHandler.CleanShutdownFileName) + val currentVersion = 0 + + @throws[Exception] + def write(brokerEpoch: Long): Unit = { +write(brokerEpoch, currentVersion) + } + + // visible to test. + @throws[Exception] + def write(brokerEpoch: Long, version: Int): Unit = { +val os = new FileOutputStream(cleanShutdownFile) +val bw = new BufferedWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8)) +try { + bw.write("version: " + version + '\n') + bw.write("brokerEpoch: " + brokerEpoch) + bw.flush() + os.getFD.sync() +} finally bw.close() + } + + @throws[Exception] + def read: Long = { +val br = Files.newBufferedReader(cleanShutdownFile.toPath, StandardCharsets.UTF_8) +val whiteSpacesPattern = Pattern.compile(":\\s+") +var brokerEpoch = -1L +try { + val versionString = br.readLine + val versionArray = whiteSpacesPattern.split(versionString) + if (versionArray.length != 2) { +throwIOException("can't parse version from \"" + versionString + "\"") + } + if (versionArray(1).toInt != currentVersion) { Review Comment: In the future, we may evolve the file format by adding new fields and bumping up the version. To make downgrading possible, it might be useful to relax this kind of check on version. ## core/src/test/scala/unit/kafka/log/CleanShutdownFileHandlerTest.scala: ## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.log Review Comment: no need for unit. ## core/src/main/scala/kafka/log/CleanShutdownFileHandler.scala: ## @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358791361 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1762139883 Got it! I'll make this change - for now I have gone through the code and the following two references and compiled a list of configs that are somehow "controlled" by KS. For now sharing the Producer Configs here and soon Consumer Configs too **Producer Configs with EoS Disabled** ``` 1. [Editable] [CustomDefault] linger.ms = 100 2. [Fixed] partitioner.class = StreamsPartitioner ``` **Producer Configs with EoS Disabled** ``` 1. [Editable] [CustomDefault] linger.ms = 100 2. [Fixed] partitioner.class = StreamsPartitioner 3. [Fixed] enable.idempotence = true 4. [Validate] max.in.flight.requests.per.connection <= 5 5. [Fixed] [NoDefault] transactional.id = - 6. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX 7. [Editable] [CustomDefault] transaction.timeout.ms = 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358773873 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel, topicPartition, status.error.exceptionName)) } + +if (request.header.apiVersion >= 10) { + status.currentLeader = { +status.error match { + case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH => Review Comment: will move this into the if block. why can't produce receive FENCED_LEADER_EPOCH? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358773301 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +val (leaderId, leaderEpoch) = partitionInfoOrError match { + case Right(x) => +(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch) + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +metadataCache.getPartitionInfo(tp.topic, tp.partition) match { + case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch()) + case None => (-1, -1) +} +} +val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName).getOrElse({ Review Comment: will update the listener name. I think the motivation for checking replica manager first is it may be faster than metadata cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
rbaddam commented on PR #14491: URL: https://github.com/apache/kafka/pull/14491#issuecomment-1762132178 Thank you @omkreddy for the review and feedback. I have updated the [KIP-982](https://cwiki.apache.org/confluence/display/KAFKA/KIP-982%3A+Access+SslPrincipalMapper+and+kerberosShortNamer+in+Custom+KafkaPrincipalBuilder) with the latest changes from the pull request. Could you please review the changes when you have a moment? Your feedback is highly appreciated. Thanks, Raghu Baddam -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358764452 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358761112 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358758478 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -1107,7 +1107,7 @@ public void testFetchMaxPollRecords() { subscriptions.seek(tp0, 1); client.prepareResponse(matchesOffset(tidp0, 1), fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0)); -client.prepareResponse(matchesOffset(tidp0, 4), fullFetchResponse(tidp0, this.nextRecords, Errors.NONE, 100L, 0)); +client.prepareResponse(matchesOffset(tidp0, 4), fullFetchResponse(tidp0, nextRecords, Errors.NONE, 100L, 0)); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15605 Handle pending topic deletions during migration [kafka]
mumrah opened a new pull request, #14545: URL: https://github.com/apache/kafka/pull/14545 After migrating metadata to KRaft, we need to finalize the pending topic deletions. Since the brokers need the StopReplica request in order to clean up their log directories, we can't simply exclude the pending topics from the migration. This patch continues to migrate the pending topics, but adds logic to delete the topic from KRaft shortly after the migration. This lets us reuse the existing dual-write logic for topic deletions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775037#comment-17775037 ] Luke Kirby commented on KAFKA-15602: {quote}The original documentation mentioned the user "{_}Do not need to flip{_}", I assume that means the serializer will automatically rewind the position to zero for the user as flip explicitly stated ".{_}..limit is set to the current position and then the position is set to zero{_}". However, the serializer API didn't make any assumptions to the current position of the buffer, so it is hard to say if the original design intended to handle a user-given offset... From reading the unit test, it doesn't seem so. {quote} Ah, no there was no documentation on the serializer before; the "Do not need to flip" comment was introduced with the breaking change in 3.4, though it's pretty plain to see that previous users of the serializer likely _were_ flipping, or, at least, writing buffers (with content starting at position 0), flipping them, and sending them _worked_ but now doesn't. Looking at the history for the file, it does look like prior to the 3.4 change it always did a rewind() so yeah, position() on the buffer has never really been respected. Now, it kind of _does_ respect position; but not in the expected way, as it essentially identifies the end of the buffer rather than the start. I don't know why the original version used a rewind() rather than expecting the buffer to already have a position() reflecting the start point to read from, since in my experience this is clearly the convention for passing ByteBuffer inputs for reading in any library I'm familiar with, and I expect the expectation of most users; perhaps I'm ignorant of some other pattern! I don't know if there's some valid contextual reason for it using explicit rewinds() before, but it seems unusual. My suggestion would be to replace the currently named ByteBufferSerializer with a new one that respects the standard interface unambiguously – i.e., it serializes data from buffer.pos() to buffer.limit(), continuing to use appropriate shortcuts for the simple wrap(byte[]) cases. {quote}I wonder if the sensible thing to do for now is to is to make the contract and expectations more explicit on the Javadoc, i.e., the serialize method doesn't support user-provided offset, and the serializer will rewind the current offset to zero for the user. {quote} Explicit documentation would surely be improvement, and is probably at the core of how this situation was arrived at, but it doesn't do anything about folk being surprised by the completely different behavior and broken output that worked from 0.10 - 3.3.2 when upgrading to 3.4.0... though hopefully they notice it. > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Priority: Critical > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails
[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15605: - Summary: Topics marked for deletion in ZK are incorrectly migrated to KRaft (was: Topic marked for deletion are incorrectly migrated to KRaft) > Topics marked for deletion in ZK are incorrectly migrated to KRaft > -- > > Key: KAFKA-15605 > URL: https://issues.apache.org/jira/browse/KAFKA-15605 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Priority: Major > Fix For: 3.6.1 > > > When migrating topics from ZooKeeper, the KRaft controller reads all the > topic and partition metadata from ZK directly. This includes topics which > have been marked for deletion by the ZK controller. > Since the client request to delete these topics has already been returned as > successful, it would be confusing to the client that the topic still existed. > An operator or application would need to issue another topic deletion to > remove these topics once the controller had moved to KRaft. If they tried to > create a new topic with the same name, they would receive a > TOPIC_ALREADY_EXISTS error. > The migration logic should carry over pending topic deletions and resolve > them either as part of the migration or shortly after. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15605: - Description: When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This includes topics which have been marked for deletion by the ZK controller. After being migrated to KRaft, the pending topic deletions are never completed, so it is as if the delete topic request never happened. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. was: When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This includes topics which have been marked for deletion by the ZK controller. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. > Topics marked for deletion in ZK are incorrectly migrated to KRaft > -- > > Key: KAFKA-15605 > URL: https://issues.apache.org/jira/browse/KAFKA-15605 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Priority: Major > Fix For: 3.6.1 > > > When migrating topics from ZooKeeper, the KRaft controller reads all the > topic and partition metadata from ZK directly. This includes topics which > have been marked for deletion by the ZK controller. After being migrated to > KRaft, the pending topic deletions are never completed, so it is as if the > delete topic request never happened. > Since the client request to delete these topics has already been returned as > successful, it would be confusing to the client that the topic still existed. > An operator or application would need to issue another topic deletion to > remove these topics once the controller had moved to KRaft. If they tried to > create a new topic with the same name, they would receive a > TOPIC_ALREADY_EXISTS error. > The migration logic should carry over pending topic deletions and resolve > them either as part of the migration or shortly after. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15605: - Description: When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This includes topics which have been marked for deletion by the ZK controller. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. was: When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This incorrectly includes topics which have been marked for deletion by the ZK controller. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. > Topics marked for deletion in ZK are incorrectly migrated to KRaft > -- > > Key: KAFKA-15605 > URL: https://issues.apache.org/jira/browse/KAFKA-15605 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Priority: Major > Fix For: 3.6.1 > > > When migrating topics from ZooKeeper, the KRaft controller reads all the > topic and partition metadata from ZK directly. This includes topics which > have been marked for deletion by the ZK controller. > Since the client request to delete these topics has already been returned as > successful, it would be confusing to the client that the topic still existed. > An operator or application would need to issue another topic deletion to > remove these topics once the controller had moved to KRaft. If they tried to > create a new topic with the same name, they would receive a > TOPIC_ALREADY_EXISTS error. > The migration logic should carry over pending topic deletions and resolve > them either as part of the migration or shortly after. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15605: - Description: When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This incorrectly includes topics which have been marked for deletion by the ZK controller. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. was: When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This includes topics which have been marked for deletion by the ZK controller. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. > Topics marked for deletion in ZK are incorrectly migrated to KRaft > -- > > Key: KAFKA-15605 > URL: https://issues.apache.org/jira/browse/KAFKA-15605 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Priority: Major > Fix For: 3.6.1 > > > When migrating topics from ZooKeeper, the KRaft controller reads all the > topic and partition metadata from ZK directly. This incorrectly includes > topics which have been marked for deletion by the ZK controller. > Since the client request to delete these topics has already been returned as > successful, it would be confusing to the client that the topic still existed. > An operator or application would need to issue another topic deletion to > remove these topics once the controller had moved to KRaft. If they tried to > create a new topic with the same name, they would receive a > TOPIC_ALREADY_EXISTS error. > The migration logic should carry over pending topic deletions and resolve > them either as part of the migration or shortly after. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15605) Topic marked for deletion are incorrectly migrated to KRaft
David Arthur created KAFKA-15605: Summary: Topic marked for deletion are incorrectly migrated to KRaft Key: KAFKA-15605 URL: https://issues.apache.org/jira/browse/KAFKA-15605 Project: Kafka Issue Type: Bug Components: controller, kraft Affects Versions: 3.6.0 Reporter: David Arthur Fix For: 3.6.1 When migrating topics from ZooKeeper, the KRaft controller reads all the topic and partition metadata from ZK directly. This includes topics which have been marked for deletion by the ZK controller. Since the client request to delete these topics has already been returned as successful, it would be confusing to the client that the topic still existed. An operator or application would need to issue another topic deletion to remove these topics once the controller had moved to KRaft. If they tried to create a new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error. The migration logic should carry over pending topic deletions and resolve them either as part of the migration or shortly after. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775034#comment-17775034 ] ASF GitHub Bot commented on KAFKA-15552: ijuma commented on code in PR #560: URL: https://github.com/apache/kafka-site/pull/560#discussion_r1358724223 ## 36/upgrade.html: ## @@ -84,7 +84,9 @@ Upgrading KRaft-based cl Notable changes in 3.6.0 -Apache Kafka now supports having both an IPv4 and an IPv6 listener on the same port. This change only applies to +ZooKeeper to KRaft migrations are now recommended for production usage. One significant issue was found in the + 3.6.0 release which affects transactional producers https://issues.apache.org/jira/browse/KAFKA-15552. Review Comment: And can we be clearer about what we're recommending? Are we saying that they should wait for 3.6.1 if they use idempotent producers (the default since 3.0) or transactions? > Duplicate Producer ID blocks during ZK migration > > > Key: KAFKA-15552 > URL: https://issues.apache.org/jira/browse/KAFKA-15552 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.6.1 > > > When migrating producer ID blocks from ZK to KRaft, we are taking the current > producer ID block from ZK and writing it's "firstProducerId" into the > producer IDs KRaft record. However, in KRaft we store the _next_ producer ID > block in the log rather than storing the current block like ZK does. The end > result is that the first block given to a caller of AllocateProducerIds is a > duplicate of the last block allocated in ZK mode. > > This can result in duplicate producer IDs being given to transactional or > idempotent producers. In the case of transactional producers, this can cause > long term problems since the producer IDs are persisted and reused for a long > time. > The time between the last producer ID block being allocated by the ZK > controller and all the brokers being restarted following the metadata > migration is when this bug is possible. > > Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException > and possibly some producer epoch validation errors. To see if a cluster is > affected by this bug, search for the offending producer ID and see if it is > being used by more than one producer. > > For example, the following error was observed > {code} > Out of order sequence number for producer 376000 at offset 381338 in > partition REDACTED: 0 (incoming seq. number), 21 (current end sequence > number) > {code} > Then searching for "376000" on > org.apache.kafka.clients.producer.internals.TransactionManager logs, two > brokers both show the same producer ID being provisioned > {code} > Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1 > Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358717315 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358713529 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358712167 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775032#comment-17775032 ] ASF GitHub Bot commented on KAFKA-15552: jolshan commented on code in PR #560: URL: https://github.com/apache/kafka-site/pull/560#discussion_r1358710418 ## 36/upgrade.html: ## @@ -84,7 +84,9 @@ Upgrading KRaft-based cl Notable changes in 3.6.0 -Apache Kafka now supports having both an IPv4 and an IPv6 listener on the same port. This change only applies to +ZooKeeper to KRaft migrations are now recommended for production usage. One significant issue was found in the + 3.6.0 release which affects transactional producers https://issues.apache.org/jira/browse/KAFKA-15552. Review Comment: we should clarify this is idempotent producers as well. > Duplicate Producer ID blocks during ZK migration > > > Key: KAFKA-15552 > URL: https://issues.apache.org/jira/browse/KAFKA-15552 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.6.1 > > > When migrating producer ID blocks from ZK to KRaft, we are taking the current > producer ID block from ZK and writing it's "firstProducerId" into the > producer IDs KRaft record. However, in KRaft we store the _next_ producer ID > block in the log rather than storing the current block like ZK does. The end > result is that the first block given to a caller of AllocateProducerIds is a > duplicate of the last block allocated in ZK mode. > > This can result in duplicate producer IDs being given to transactional or > idempotent producers. In the case of transactional producers, this can cause > long term problems since the producer IDs are persisted and reused for a long > time. > The time between the last producer ID block being allocated by the ZK > controller and all the brokers being restarted following the metadata > migration is when this bug is possible. > > Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException > and possibly some producer epoch validation errors. To see if a cluster is > affected by this bug, search for the offending producer ID and see if it is > being used by more than one producer. > > For example, the following error was observed > {code} > Out of order sequence number for producer 376000 at offset 381338 in > partition REDACTED: 0 (incoming seq. number), 21 (current end sequence > number) > {code} > Then searching for "376000" on > org.apache.kafka.clients.producer.internals.TransactionManager logs, two > brokers both show the same producer ID being provisioned > {code} > Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1 > Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775029#comment-17775029 ] ASF GitHub Bot commented on KAFKA-15552: mumrah opened a new pull request, #560: URL: https://github.com/apache/kafka-site/pull/560 (no comment) > Duplicate Producer ID blocks during ZK migration > > > Key: KAFKA-15552 > URL: https://issues.apache.org/jira/browse/KAFKA-15552 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.6.1 > > > When migrating producer ID blocks from ZK to KRaft, we are taking the current > producer ID block from ZK and writing it's "firstProducerId" into the > producer IDs KRaft record. However, in KRaft we store the _next_ producer ID > block in the log rather than storing the current block like ZK does. The end > result is that the first block given to a caller of AllocateProducerIds is a > duplicate of the last block allocated in ZK mode. > > This can result in duplicate producer IDs being given to transactional or > idempotent producers. In the case of transactional producers, this can cause > long term problems since the producer IDs are persisted and reused for a long > time. > The time between the last producer ID block being allocated by the ZK > controller and all the brokers being restarted following the metadata > migration is when this bug is possible. > > Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException > and possibly some producer epoch validation errors. To see if a cluster is > affected by this bug, search for the offending producer ID and see if it is > being used by more than one producer. > > For example, the following error was observed > {code} > Out of order sequence number for producer 376000 at offset 381338 in > partition REDACTED: 0 (incoming seq. number), 21 (current end sequence > number) > {code} > Then searching for "376000" on > org.apache.kafka.clients.producer.internals.TransactionManager logs, two > brokers both show the same producer ID being provisioned > {code} > Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1 > Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358701620 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358693714 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
omkreddy commented on PR #14491: URL: https://github.com/apache/kafka/pull/14491#issuecomment-1762024387 @rbaddam Thanks for the PR. Can you please update the KIP with newly added interfaces also. I will review the PR once KIP passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
ijuma commented on PR #14529: URL: https://github.com/apache/kafka/pull/14529#issuecomment-1762008506 > @ijuma Thanks for the PR. I will review it tomorrow. Thanks @satishd. I won't merge until Tuesday to give you a chance to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358682245 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
ijuma commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r135839 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import static java.util.Arrays.asList; + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * This class is not thread-safe. + */ +public class LogSegment { +private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); +private static final Timer LOG_FLUSH_TIMER; + +static { +KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { +@Override +public MetricName metricName(String name, Map tags) { +// Override the group and type names for compatibility +return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); +} +}; +LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); +} + +private final FileRecords log; +private final LazyIndex lazyOffsetIndex; +private final LazyIndex lazyTimeIndex; +private final TransactionIndex txnIndex; +private final long baseOffset; +private final int indexIntervalBytes; +private final long rollJitterMs; +private final Time time; + +// The timestamp we used for time based log rolling and for ensuring max compaction delay +// volatile for LogCleaner to see the update +private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); + +/* The maximum timestamp and offset we see so far */ +private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; + +private long created; + +/* the number of bytes since we last added an entry in the offset index */ +private int bytesSinceLastIndexEntry = 0; + +/** + * Create a LogSegment with the provided parameters. + * + * @param log The file records containing log entries + * @param lazyOffsetIndex The offset index + * @param lazyTimeIndex The timestamp index + * @param txnIndex The transaction index + * @param baseOffset A lower bound on the offsets in this segment + * @param indexIntervalBytes The approximate number of bytes between entries in the index + *
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358665833 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
ijuma commented on PR #14529: URL: https://github.com/apache/kafka/pull/14529#issuecomment-1761985192 > Thank you for patiently answering my comments Thanks for the review and for paying close attention to code quality (modularity, readability, etc.) - it's an important area! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358658187 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1761979623 Hi @cadonna - Thank you for putting time into this PR, very much appreciated. I responded to some of your questions, let me know if there is still any ambiguity left. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1358656536 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: I think the only time we invoke onComplete() is when a response is available (see below). If the response is `null`, which indicates the request has not been sent out; therefore, in the `NetworkClientDelegate`, we need to actively fail the request on timeout. onComplete when response is available: ``` private void completeResponses(List responses) { for (ClientResponse response : responses) { try { response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } ``` We actively expires the unsent request and fail them with TimeoutException: ``` if (unsent.timer.isExpired()) { iterator.remove(); unsent.handler.onFailure(currentTimeMs, new TimeoutException( "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); continue; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14767: Fix missing commitId build error after git gc [kafka]
ijuma commented on PR #13315: URL: https://github.com/apache/kafka/pull/13315#issuecomment-1761974609 Well, Jenkins failed - we need to firs that first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358651225 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1358640330 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -254,28 +250,38 @@ public String toString() { } } -public static class FutureCompletionHandler implements RequestCompletionHandler { +public static class FutureCompletionHandler extends CompletableFuture implements RequestCompletionHandler { -private final CompletableFuture future; +/** + * The time when the response is completed. This is used when the response is completed exceptionally because + * ClientResponse already contains received time which is injected by the network client. + */ Review Comment: Thanks, I think I was trying to say the main use of responseCompletionTimeMs is mainly used when the future gets completed exceptionally. For the successful requests, we could use either the ClientResponse or this time. I'll remove this comment to avoid the ambiguity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1358637014 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs, private void retry(final long currentTimeMs) { onFailedAttempt(currentTimeMs); -onSendAttempt(currentTimeMs); Review Comment: This is duplicated because the manager only sends out the request when it is being polled via the `poll()` method, which `drain()` the unsent requests from the queue. In the `drain()` it already invokes the `onSentAttempt()` See ``` for (OffsetFetchRequestState request : partitionedBySendability.get(true)) { request.onSendAttempt(currentTimeMs); unsentRequests.add(request.toUnsentRequest()); inflightOffsetFetches.add(request); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14767: Fix missing commitId build error after git gc [kafka]
gharris1727 commented on PR #13315: URL: https://github.com/apache/kafka/pull/13315#issuecomment-1761944410 Hey @ijuma @divijvaidya @C0urante or @yashmayya Could you review this build improvement? I'm still getting this `git gc` failure regularly when I try to build branches that have been idle for some time. This also includes an improvement (the `isDirectory` guard) which allows us to use `git worktree` to check out and build multiple branches at one time. I'd like to backport this to 3.4, 3.5, and 3.6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774807#comment-17774807 ] Philip Nee edited comment on KAFKA-15602 at 10/13/23 5:43 PM: -- Hi [~luke.kirby] , Thanks for reporting this. I briefly went over the code a bit and I can now understand your concern. The original documentation mentioned the user "{_}Do not need to flip{_}", I assume that means the serializer will automatically rewind the position to zero for the user as flip explicitly stated ".{_}..limit is set to the current position and then the position is set to zero{_}". However, the serializer API didn't make any assumptions to the current position of the buffer, so it is hard to say if the original design intended to handle a user-given offset... From reading the unit test, it doesn't seem so. I can see the problem comes from the serializer doesn't know if the position is an offset or just the next byte to be written. These are two different definitions of the position so it doesn't really make sense to handle both cases in a single API call. I wonder if the sensible thing to do for now is to is to make the contract and expectations more explicit on the Javadoc, i.e., the serialize method doesn't support user-provided offset, and the serializer will rewind the current offset to zero for the user. [~guozhang] - Would you kindly provide some input on this issue? As you reviewed the KAFKA-4852. Thanks! P was (Author: JIRAUSER283568): Hi [~luke.kirby] , Thanks for reporting this. I briefly went over the code a bit and I can now understand your concern. The original documentation mentioned the user "{_}Do not need to flip{_}", I assume that means the serializer will automatically rewind the position to zero for the user as flip explicitly stated ".{_}..limit is set to the current position and then the position is set to zero{_}". However, the serializer API didn't make any assumptions to the current position of the buffer, so it is hard to say if the original design intended to handle a user-given offset... From reading the unit test, it doesn't seem so. I can see the problem comes from the serializer doesn't know if the position is an offset or just the next byte to be written. These are two different definitions of the position so it doesn't really make sense to handle both cases in a single API call. I wonder if we could do the following: If the user doesn't want to provide an offset, which is the most common use case, the user may continue using the existing API. If the user wraps the buffer with an offset, we might want to provide another API to specify the buffer, so that we can rewind the position to the correct position. [~guozhang] - Would you kindly provide some input on this issue? As you reviewed the KAFKA-4852. Thanks! P > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Priority: Critical > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case,
Re: [PR] MINOR: Debug EmbeddedConnect/KafkaCluster failures [kafka]
gharris1727 closed pull request #13580: MINOR: Debug EmbeddedConnect/KafkaCluster failures URL: https://github.com/apache/kafka/pull/13580 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
gharris1727 commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1358597074 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ +public synchronized LoggerLevel level(String logger) { Review Comment: @yangy Writes must have an exclusive lock for thread safety, meaning that no concurrent reads are allowed while a write is happening. The synchronized lock here makes sure that no writes are ongoing. It's a bit stronger than necessary: multiple concurrent readers could be allowed, but multiple readers sharing a lock is a little bit more complex, and probably not worth the performance difference. These methods are not called in any hotpath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
ijuma commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r1358592259 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import static java.util.Arrays.asList; + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * This class is not thread-safe. + */ +public class LogSegment { +private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); +private static final Timer LOG_FLUSH_TIMER; + +static { +KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { +@Override +public MetricName metricName(String name, Map tags) { +// Override the group and type names for compatibility +return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); +} +}; +LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); +} + +private final FileRecords log; +private final LazyIndex lazyOffsetIndex; +private final LazyIndex lazyTimeIndex; +private final TransactionIndex txnIndex; +private final long baseOffset; +private final int indexIntervalBytes; +private final long rollJitterMs; +private final Time time; + +// The timestamp we used for time based log rolling and for ensuring max compaction delay +// volatile for LogCleaner to see the update +private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); + +/* The maximum timestamp and offset we see so far */ +private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; + +private long created; + +/* the number of bytes since we last added an entry in the offset index */ +private int bytesSinceLastIndexEntry = 0; + +/** + * Create a LogSegment with the provided parameters. + * + * @param log The file records containing log entries + * @param lazyOffsetIndex The offset index + * @param lazyTimeIndex The timestamp index + * @param txnIndex The transaction index + * @param baseOffset A lower bound on the offsets in this segment + * @param indexIntervalBytes The approximate number of bytes between entries in the index + *
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
ijuma commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r1358592259 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import static java.util.Arrays.asList; + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * This class is not thread-safe. + */ +public class LogSegment { +private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); +private static final Timer LOG_FLUSH_TIMER; + +static { +KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { +@Override +public MetricName metricName(String name, Map tags) { +// Override the group and type names for compatibility +return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); +} +}; +LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); +} + +private final FileRecords log; +private final LazyIndex lazyOffsetIndex; +private final LazyIndex lazyTimeIndex; +private final TransactionIndex txnIndex; +private final long baseOffset; +private final int indexIntervalBytes; +private final long rollJitterMs; +private final Time time; + +// The timestamp we used for time based log rolling and for ensuring max compaction delay +// volatile for LogCleaner to see the update +private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); + +/* The maximum timestamp and offset we see so far */ +private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; + +private long created; + +/* the number of bytes since we last added an entry in the offset index */ +private int bytesSinceLastIndexEntry = 0; + +/** + * Create a LogSegment with the provided parameters. + * + * @param log The file records containing log entries + * @param lazyOffsetIndex The offset index + * @param lazyTimeIndex The timestamp index + * @param txnIndex The transaction index + * @param baseOffset A lower bound on the offsets in this segment + * @param indexIntervalBytes The approximate number of bytes between entries in the index + *
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
divijvaidya commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r1358569570 ## clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java: ## @@ -44,6 +45,17 @@ public RecordBatch firstBatch() { return iterator.next(); } +@Override +public Optional lastBatch() { +Iterator iterator = batches().iterator(); + +RecordBatch batch = null; +while (iterator.hasNext()) +batch = iterator.next(); + +return Optional.ofNullable(batch); Review Comment: Never mind. My misunderstanding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
divijvaidya commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r1358566629 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import static java.util.Arrays.asList; + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * This class is not thread-safe. + */ +public class LogSegment { +private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); +private static final Timer LOG_FLUSH_TIMER; + +static { +KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { +@Override +public MetricName metricName(String name, Map tags) { +// Override the group and type names for compatibility +return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); +} +}; +LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); +} + +private final FileRecords log; +private final LazyIndex lazyOffsetIndex; +private final LazyIndex lazyTimeIndex; +private final TransactionIndex txnIndex; +private final long baseOffset; +private final int indexIntervalBytes; +private final long rollJitterMs; +private final Time time; + +// The timestamp we used for time based log rolling and for ensuring max compaction delay +// volatile for LogCleaner to see the update +private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); + +/* The maximum timestamp and offset we see so far */ +private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; + +private long created; + +/* the number of bytes since we last added an entry in the offset index */ +private int bytesSinceLastIndexEntry = 0; + +/** + * Create a LogSegment with the provided parameters. + * + * @param log The file records containing log entries + * @param lazyOffsetIndex The offset index + * @param lazyTimeIndex The timestamp index + * @param txnIndex The transaction index + * @param baseOffset A lower bound on the offsets in this segment + * @param indexIntervalBytes The approximate number of bytes between entries in the index + *
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
rbaddam commented on PR #14491: URL: https://github.com/apache/kafka/pull/14491#issuecomment-1761825493 Also we have implemented the SPIFFE Integration in our echo system and [these changes](https://github.com/apache/kafka/pull/14491) will help to make the Spiffe Integration smooth. There an Open KIP on the Spiffe Integration [KIP-880](https://cwiki.apache.org/confluence/display/KAFKA/KIP-880%3A+X509+SAN+based+SPIFFE+URI+ACL+within+mTLS+Client+Certificates) If needed I am willing to Contribute to the above KIP-880 as well. cc: @omkreddy / @satishd / @mimaison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15604) Add Telemetry RPCs Definitions
[ https://issues.apache.org/jira/browse/KAFKA-15604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal reassigned KAFKA-15604: - Assignee: Apoorv Mittal > Add Telemetry RPCs Definitions > -- > > Key: KAFKA-15604 > URL: https://issues.apache.org/jira/browse/KAFKA-15604 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > > The goal of this task is to define the Telemetry RPCs as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-KafkaProtocolChanges]; > > > This requires the following steps: > # The request/response schemas must be defined (json schema) > # Request/response classes must be defined -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15601) Client metrics and observability
[ https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal reassigned KAFKA-15601: - Assignee: Apoorv Mittal > Client metrics and observability > > > Key: KAFKA-15601 > URL: https://issues.apache.org/jira/browse/KAFKA-15601 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, consumer, core, producer , streams >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Labels: kip > > This Jira tracks the development of KIP-714: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14482) Move LogLoader to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774988#comment-17774988 ] Ismael Juma commented on KAFKA-14482: - No worries, thanks for the quick response. :) > Move LogLoader to storage module > > > Key: KAFKA-14482 > URL: https://issues.apache.org/jira/browse/KAFKA-14482 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14482) Move LogLoader to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-14482: --- Assignee: Ismael Juma > Move LogLoader to storage module > > > Key: KAFKA-14482 > URL: https://issues.apache.org/jira/browse/KAFKA-14482 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14482) Move LogLoader to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-14482: Fix Version/s: 3.7.0 > Move LogLoader to storage module > > > Key: KAFKA-14482 > URL: https://issues.apache.org/jira/browse/KAFKA-14482 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15338: The metric group documentation for metrics added in KAFK… [kafka]
mjsax commented on PR #14221: URL: https://github.com/apache/kafka/pull/14221#issuecomment-1761809518 No problem :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]
ijuma commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r1358306908 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -0,0 +1,887 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import static java.util.Arrays.asList; + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * This class is not thread-safe. + */ +public class LogSegment { +private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); +private static final Timer LOG_FLUSH_TIMER; + +static { +KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { +@Override +public MetricName metricName(String name, Map tags) { +// Override the group and type names for compatibility - this metrics group was previously defined within +// a Scala object named `kafka.log.LogFlushStats` +return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); +} +}; +LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); Review Comment: I'm not sure what to add as a comment, static fields basically last for the life of the JVM. I would personally prefer to have this metric be scoped to the life of a kafka server, but that would be a behavior change and best handled via a separate JIRA/PR. I simply preserved the behavior here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15604) Add Telemetry RPCs Definitions
Apoorv Mittal created KAFKA-15604: - Summary: Add Telemetry RPCs Definitions Key: KAFKA-15604 URL: https://issues.apache.org/jira/browse/KAFKA-15604 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal The goal of this task is to define the Telemetry RPCs as described [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-KafkaProtocolChanges]; This requires the following steps: # The request/response schemas must be defined (json schema) # Request/response classes must be defined -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
splett2 commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1358490055 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +val (leaderId, leaderEpoch) = partitionInfoOrError match { + case Right(x) => +(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch) + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +metadataCache.getPartitionInfo(tp.topic, tp.partition) match { + case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch()) + case None => (-1, -1) +} +} +val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName).getOrElse({ Review Comment: we shouldn't be passing through the interbroker listener name, we should be using the listener used by the original request to be consistent with the metadata request. Is it simpler if we just consult the metadata cache? In KRaft mode, the metadata cache is the source of truth for partition leadership and is updated before the partition state gets updated. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel, topicPartition, status.error.exceptionName)) } + +if (request.header.apiVersion >= 10) { + status.currentLeader = { +status.error match { + case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH => Review Comment: produce requests should never receive FENCED_LEADER_EPOCH. also, shouldn't this go in the above `if` block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org