Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-13 Thread via GitHub


iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1348641063


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -568,27 +621,26 @@ class RemoteIndexCacheTest {
   }
 
   private def verifyFetchIndexInvocation(count: Int,
- indexTypes: Seq[IndexType] =
- Seq(IndexType.OFFSET, 
IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
+ indexTypes: Seq[IndexType]): Unit = {
 for (indexType <- indexTypes) {
   verify(rsm, 
times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
ArgumentMatchers.eq(indexType))
 }
   }
 
   private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TransactionIndex = {
-val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata)
+val txnIdxFile = remoteTransactionIndexFile(new File(tpDir, DIR_NAME), 
metadata)
 txnIdxFile.createNewFile()
 new TransactionIndex(metadata.startOffset(), txnIdxFile)
   }
 
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TimeIndex = {
 val maxEntries = (metadata.endOffset() - 
metadata.startOffset()).asInstanceOf[Int]
-new TimeIndex(remoteTimeIndexFile(tpDir, metadata), 
metadata.startOffset(), maxEntries * 12)
+new TimeIndex(remoteTimeIndexFile(new File(tpDir, DIR_NAME), metadata), 
metadata.startOffset(), maxEntries * 12)

Review Comment:
   @jeel2420  
   There are three functions which is  used to create Indexes on tpDir storage. 
 ```
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TimeIndex 
 private def createOffsetIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): OffsetIndex
 private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TransactionIndex
   
   ```
   tpDir storage in the current  unit test act as a data place fetched from 
remote storage. 
   
   The cases where tpDir  has been used instead of remoteCacheDir is only when 
you are creating the spyEntry. But if you see the test cases where it has 
happened , they are just testing the cache functionality and not mixing it with 
the remote storage manager functionality. 
   To support the use cases mentioned above you can do this step
   
   **Suggestion:**
   Change the signature of the below  method to take Directory path also has an 
parameter , and refactor the existing test case
```
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata,dir:File): TimeIndex 
 private def createOffsetIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata,dir:File): OffsetIndex
 private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata,dir:File):TransactionIndex
   ```
   Then you can pass the appropriate directory required for your test case. 
   
   cc @showuon  @divijvaidya 
   
   



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -193,7 +192,16 @@ public File cacheDir() {
 public void remove(Uuid key) {
 lock.readLock().lock();
 try {
-internalCache.invalidate(key);
+internalCache.asMap().computeIfPresent(key, (k, v) -> {
+try {
+v.markForCleanup();
+expiredIndexes.put(v);

Review Comment:
   @showuon  @jeel2420  
   Should not we use expiredIndxes.offer instead of expiredIndexes.put()
   as put will block the operation if queue size is full ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-13 Thread via GitHub


showuon commented on PR #14483:
URL: https://github.com/apache/kafka/pull/14483#issuecomment-1762593153

   @jeel2420 ,  there's a test failed due to this change, please take a look.
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14483/10/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]

2023-10-13 Thread via GitHub


guozhangwang commented on PR #14157:
URL: https://github.com/apache/kafka/pull/14157#issuecomment-1762506459

   > @guozhangwang -- yes, still looking for input on this. Don't need a full 
review, just some input if we think it's the right approach to begin with (cf 
Sophie's justified comment, that is will make it a little bit more "messy")
   
   Ack, will take a look asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: cleanup some compiler warnings in Kafka Streams examples [kafka]

2023-10-13 Thread via GitHub


mjsax merged PR #14547:
URL: https://github.com/apache/kafka/pull/14547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Only commit running active and standby tasks when tasks corrupted [kafka]

2023-10-13 Thread via GitHub


guozhangwang commented on code in PR #14508:
URL: https://github.com/apache/kafka/pull/14508#discussion_r1359038910


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -223,10 +223,7 @@ boolean handleCorruption(final Set corruptedTasks) 
{
 final Collection tasksToCommit = allTasks()
 .values()
 .stream()
-// TODO: once we remove state restoration from the stream 
thread, we can also remove
-//  the RESTORING state here, since there will not be any 
restoring tasks managed
-//  by the stream thread anymore.
-.filter(t -> t.state() == Task.State.RUNNING || t.state() == 
Task.State.RESTORING)
+.filter(t -> t.state() == Task.State.RUNNING)

Review Comment:
   Hey @cadonna sorry I came late on this PR.
   
   One thing I'd like to raise is that in the past, we've seen active task 
restoring never complete under rolling restart / rebalance storm scenarios 
since we kept losing the progress we made thus far when reviving. I'm not 100% 
sure if this part of the code is related to that scenario but just try to 
double check. If you have thought about it and concluded this would not be 
related, I'm relieved :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-13 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1762463861

   @jolshan should we still keep them as sensors or do we want to remove them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15605 Handle pending topic deletions during migration [kafka]

2023-10-13 Thread via GitHub


cmccabe commented on PR #14545:
URL: https://github.com/apache/kafka/pull/14545#issuecomment-1762432231

   Hmm, this isn’t how we agreed to handle this. We were going to get rid of 
deleting state in hybrid brokers as I recall.
   
   Then we were going to actually verify that broker topic IDs matched what was 
sent in LeaderAndIsr, and move aside or delete if not.
   
   In the initial full `leaderAndIsrRequest`, we can use 
`LeaderAndIsrRequest.Type` = All to indicate that the list is exhaustive, and 
move aside topics that don't appear in there


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: cleanup some warning in Kafka Streams examples [kafka]

2023-10-13 Thread via GitHub


mjsax opened a new pull request, #14547:
URL: https://github.com/apache/kafka/pull/14547

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990797


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -66,31 +82,107 @@ boolean isEmpty() {
  * @return {@code true} if there are completed fetches that match the 
{@link Predicate}, {@code false} otherwise
  */
 boolean hasCompletedFetches(Predicate predicate) {
-return completedFetches.stream().anyMatch(predicate);
+try {
+lock.lock();
+return completedFetches.stream().anyMatch(predicate);
+} finally {
+lock.unlock();
+}
 }
 
 void add(CompletedFetch completedFetch) {
-completedFetches.add(completedFetch);
+try {
+lock.lock();
+completedFetches.add(completedFetch);
+notEmptyCondition.signalAll();
+} finally {
+lock.unlock();
+}
 }
 
 void addAll(Collection completedFetches) {
-this.completedFetches.addAll(completedFetches);
+if (completedFetches == null || completedFetches.isEmpty())
+return;
+
+try {
+lock.lock();
+this.completedFetches.addAll(completedFetches);
+notEmptyCondition.signalAll();
+} finally {
+lock.unlock();
+}
 }
 
 CompletedFetch nextInLineFetch() {
-return nextInLineFetch;
+try {
+lock.lock();
+return nextInLineFetch;
+} finally {
+lock.unlock();
+}
 }
 
-void setNextInLineFetch(CompletedFetch completedFetch) {
-this.nextInLineFetch = completedFetch;
+void setNextInLineFetch(CompletedFetch nextInLineFetch) {
+try {
+lock.lock();
+this.nextInLineFetch = nextInLineFetch;
+} finally {
+lock.unlock();
+}
 }
 
 CompletedFetch peek() {
-return completedFetches.peek();
+try {
+lock.lock();
+return completedFetches.peek();
+} finally {
+lock.unlock();
+}
 }
 
 CompletedFetch poll() {
-return completedFetches.poll();
+try {
+lock.lock();
+return completedFetches.poll();
+} finally {
+lock.unlock();
+}
+}
+
+/**
+ * Allows the caller to await presence of data in the buffer. The method 
will block, returning only
+ * under one of the following conditions:
+ *
+ * 
+ * The buffer was already non-empty on entry
+ * The buffer was populated during the wait
+ * The remaining time on the {@link Timer timer} elapsed
+ * The thread was interrupted
+ * 
+ *
+ * @param timer Timer that provides time to wait
+ */
+void awaitNotEmpty(Timer timer) {
+try {
+lock.lock();
+
+while (isEmpty()) {
+// Update the timer before we head into the loop in case it 
took a while to get the lock.
+timer.update();
+
+if (timer.isExpired())
+break;
+
+if (notEmptyCondition.await(timer.remainingMs(), 
TimeUnit.MILLISECONDS)) {

Review Comment:
   Thanks for catching that. I fixed this but it's pretty clear now that I need 
unit tests to validate correctness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990617


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -621,56 +825,163 @@ public void assign(Collection 
partitions) {
 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
 }
 
-// TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+// Clear the buffered data which are not a part of newly assigned 
topics
+final Set currentTopicPartitions = new HashSet<>();
+
+for (TopicPartition tp : subscriptions.assignedPartitions()) {
+if (partitions.contains(tp))
+currentTopicPartitions.add(tp);
+}
+
+fetchBuffer.retainAll(currentTopicPartitions);
 
 // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
 // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-// be no following rebalance
-eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+// be no following rebalance.
+//
+// See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
+applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(subscriptions.allConsumed(), 
time.milliseconds()));
 
 log.info("Assigned to partition(s): {}", join(partitions, ", "));
-if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
+if (subscriptions.assignFromUser(new HashSet<>(partitions)))
+applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
 }
 
 @Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-throw new KafkaException("method not implemented");
+public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.toString().isEmpty())
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+"null" : "empty"));
+
+throwIfNoAssignorsConfigured();
+log.info("Subscribed to pattern: '{}'", pattern);
+subscriptions.subscribe(pattern, listener);
+updatePatternSubscription(metadata.fetch());
+metadata.requestUpdateForNewTopics();
+}
+
+/**
+ * TODO: remove this when we implement the KIP-848 protocol.
+ *
+ * 
+ * The contents of this method are shamelessly stolen from
+ * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+ * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * @param cluster Cluster from which we get the topics
+ */
+private void updatePatternSubscription(Cluster cluster) {
+final Set topicsToSubscribe = cluster.topics().stream()
+.filter(subscriptions::matchesSubscribedPattern)
+.collect(Collectors.toSet());
+if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+metadata.requestUpdateForNewTopics();
 }
 
 @Override
 public void subscribe(Pattern pattern) {
-throw new KafkaException("method not implemented");
+subscribe(pattern, new NoOpConsumerRebalanceListener());
 }
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+fetchBuffer.retainAll(Collections.emptySet());
+subscriptions.unsubscribe();
 }
 
 @Override
 @Deprecated
-public ConsumerRecords poll(long timeout) {
-throw new KafkaException("method not implemented");
+public ConsumerRecords poll(final long timeoutMs) {
+return poll(Duration.ofMillis(timeoutMs));
 }
 
 // Visible for testing
 WakeupTrigger wakeupTrigger() {
 return wakeupTrigger;
 }
 
-private static  ClusterResourceListeners 
configureClusterResourceListeners(
-final Deserializer keyDeserializer,
-final Deserializer valueDeserializer,
-final List... candidateLists) {
-ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-for (List candidateList: candidateLists)
-clusterResourceListeners.maybeAddAll(candidateList);
+/**
+ * Send the requests for fetch data to the {@link ConsumerNetworkThread 
network thread} and set up to
+ * 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990537


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -621,56 +825,163 @@ public void assign(Collection 
partitions) {
 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
 }
 
-// TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+// Clear the buffered data which are not a part of newly assigned 
topics
+final Set currentTopicPartitions = new HashSet<>();
+
+for (TopicPartition tp : subscriptions.assignedPartitions()) {
+if (partitions.contains(tp))
+currentTopicPartitions.add(tp);
+}
+
+fetchBuffer.retainAll(currentTopicPartitions);
 
 // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
 // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-// be no following rebalance
-eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+// be no following rebalance.
+//
+// See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
+applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(subscriptions.allConsumed(), 
time.milliseconds()));
 
 log.info("Assigned to partition(s): {}", join(partitions, ", "));
-if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
+if (subscriptions.assignFromUser(new HashSet<>(partitions)))
+applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
 }
 
 @Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-throw new KafkaException("method not implemented");
+public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.toString().isEmpty())
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+"null" : "empty"));
+
+throwIfNoAssignorsConfigured();
+log.info("Subscribed to pattern: '{}'", pattern);
+subscriptions.subscribe(pattern, listener);
+updatePatternSubscription(metadata.fetch());
+metadata.requestUpdateForNewTopics();
+}
+
+/**
+ * TODO: remove this when we implement the KIP-848 protocol.
+ *
+ * 
+ * The contents of this method are shamelessly stolen from
+ * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+ * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * @param cluster Cluster from which we get the topics
+ */
+private void updatePatternSubscription(Cluster cluster) {
+final Set topicsToSubscribe = cluster.topics().stream()
+.filter(subscriptions::matchesSubscribedPattern)
+.collect(Collectors.toSet());
+if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+metadata.requestUpdateForNewTopics();
 }
 
 @Override
 public void subscribe(Pattern pattern) {
-throw new KafkaException("method not implemented");
+subscribe(pattern, new NoOpConsumerRebalanceListener());
 }
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+fetchBuffer.retainAll(Collections.emptySet());
+subscriptions.unsubscribe();
 }
 
 @Override
 @Deprecated
-public ConsumerRecords poll(long timeout) {
-throw new KafkaException("method not implemented");
+public ConsumerRecords poll(final long timeoutMs) {
+return poll(Duration.ofMillis(timeoutMs));
 }
 
 // Visible for testing
 WakeupTrigger wakeupTrigger() {
 return wakeupTrigger;
 }
 
-private static  ClusterResourceListeners 
configureClusterResourceListeners(
-final Deserializer keyDeserializer,
-final Deserializer valueDeserializer,
-final List... candidateLists) {
-ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-for (List candidateList: candidateLists)
-clusterResourceListeners.maybeAddAll(candidateList);
+/**
+ * Send the requests for fetch data to the {@link ConsumerNetworkThread 
network thread} and set up to
+ * 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -17,146 +17,195 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.internals.CachedSupplier;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
-public class ApplicationEventProcessor {
-
-private final BlockingQueue backgroundEventQueue;
+/**
+ * An {@link EventProcessor} that is created and executes in the {@link 
ConsumerNetworkThread network thread}
+ * which processes {@link ApplicationEvent application events} generated by 
the application thread.
+ */
+public class ApplicationEventProcessor extends 
EventProcessor {
 
 private final ConsumerMetadata metadata;
-
 private final RequestManagers requestManagers;
 
-public ApplicationEventProcessor(final BlockingQueue 
backgroundEventQueue,
+public ApplicationEventProcessor(final LogContext logContext,
+ final BlockingQueue 
applicationEventQueue,
  final RequestManagers requestManagers,
  final ConsumerMetadata metadata) {
-this.backgroundEventQueue = backgroundEventQueue;
+super(logContext, applicationEventQueue);
 this.requestManagers = requestManagers;
 this.metadata = metadata;
 }
 
-public boolean process(final ApplicationEvent event) {
-Objects.requireNonNull(event);
+/**
+ * Process the events—if any—that were produced by the application thread. 
It is possible that when processing
+ * an event generates an error. In such cases, the processor will 
immediately throw an exception, and not
+ * process the remaining events.
+ */
+@Override
+public void process() {
+process(error -> {
+throw error;
+});
+}
+
+@Override
+public void process(ApplicationEvent event) {
 switch (event.type()) {
-case NOOP:
-return process((NoopApplicationEvent) event);
 case COMMIT:
-return process((CommitApplicationEvent) event);
+process((CommitApplicationEvent) event);
+return;
+
 case POLL:
-return process((PollApplicationEvent) event);
+process((PollApplicationEvent) event);
+return;
+
 case FETCH_COMMITTED_OFFSET:
-return process((OffsetFetchApplicationEvent) event);
+process((OffsetFetchApplicationEvent) event);
+return;
+
 case METADATA_UPDATE:
-return process((NewTopicsMetadataUpdateRequestEvent) event);
+process((NewTopicsMetadataUpdateRequestEvent) event);
+return;
+
 case ASSIGNMENT_CHANGE:
-return process((AssignmentChangeApplicationEvent) event);
+process((AssignmentChangeApplicationEvent) event);
+return;
+
 case TOPIC_METADATA:
-return process((TopicMetadataApplicationEvent) event);
+process((TopicMetadataApplicationEvent) event);
+return;
+
 case LIST_OFFSETS:
-return process((ListOffsetsApplicationEvent) event);
+process((ListOffsetsApplicationEvent) event);
+return;
+
+case FETCH:
+process((FetchEvent) event);
+return;
+
 case RESET_POSITIONS:
-return processResetPositionsEvent();
+processResetPositionsEvent();
+return;
+
 case VALIDATE_POSITIONS:
-return processValidatePositionsEvent();
+processValidatePositionsEvent();
+return;
+
+default:
+throw new IllegalArgumentException("Application event type " + 
event.type() + " was not expected");
 }
-return false;
 }
 
-/**
- * Processes {@link NoopApplicationEvent} and enqueue a
- * {@link 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358957417


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358956804


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358956462


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358956176


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358955393


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358954499


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358953536


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358953131


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Description: 
As part of the review for [FetchRequestManager pull 
request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
questions related to the correctness and clarity of the 
{{FetcherTest.testCompletedFetchRemoval()}} test:

Questions:

* https://github.com/apache/kafka/pull/14406#discussion_r1347908197
* https://github.com/apache/kafka/pull/14406#discussion_r1347910980
* https://github.com/apache/kafka/pull/14406#discussion_r1347913781

  was:
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made two comments 
on the {{FetcherTest.testCompletedFetchRemoval()}} test:

{quote}
Is [the check for the records size of 3] redundant given the test [two lines 
above]?[1]
{quote}

And also:

{quote}
Hmm, why don't we return records from other partitions since maxRecords is 
maxInt?[2]
{quote}

References:

* [1] https://github.com/apache/kafka/pull/14406#discussion_r1347908197
* [2] https://github.com/apache/kafka/pull/14406#discussion_r1347910980


> Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
> 
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358953070


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Description: 
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made two comments 
on the {{FetcherTest.testCompletedFetchRemoval()}} test:

{quote}
Is [the check for the records size of 3] redundant given the test [two lines 
above]?[1]
{quote}

And also:

{quote}
Hmm, why don't we return records from other partitions since maxRecords is 
maxInt?[2]
{quote}

References:

* [1] https://github.com/apache/kafka/pull/14406#discussion_r1347908197
* [2] https://github.com/apache/kafka/pull/14406#discussion_r1347910980

  was:
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the records size of 3] redundant given the test [two 
lines above]?
{quote}


> Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
> 
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> As part of the review for [pull request 
> #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made two 
> comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
> {quote}
> Is [the check for the records size of 3] redundant given the test [two lines 
> above]?[1]
> {quote}
> And also:
> {quote}
> Hmm, why don't we return records from other partitions since maxRecords is 
> maxInt?[2]
> {quote}
> References:
> * [1] https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * [2] https://github.com/apache/kafka/pull/14406#discussion_r1347910980



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Description: 
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the records size of 3] redundant given the test [two 
lines above]?
{quote}

  was:
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the 
test [two lines above]?
{quote}


> Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
> 
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> As part of the review for [pull request 
> #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
> comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
> {quote}Hmm, why don't we return records from other partitions since 
> maxRecords is maxInt?
> {quote}
> and:
> {quote}Is [the check for the records size of 3] redundant given the test [two 
> lines above]?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Description: 
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:

{quote}
Hmm, why don't we return records from other partitions since maxRecords is 
maxInt?
{quote}

and:

{quote}
Is [the check for the \{{fetchedRecord}} size of 3] redundant given the test 
[two lines above]?
{quote}

  was:
As part of the review for [pull request 
#14406|[https://github.com/apache/kafka/pull/14406]], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the 
test [two lines above]?
{quote}


> Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
> 
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> As part of the review for [pull request 
> #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
> comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
> {quote}
> Hmm, why don't we return records from other partitions since maxRecords is 
> maxInt?
> {quote}
> and:
> {quote}
> Is [the check for the \{{fetchedRecord}} size of 3] redundant given the test 
> [two lines above]?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Description: 
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the 
test [two lines above]?
{quote}

  was:
As part of the review for [pull request 
#14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:

{quote}
Hmm, why don't we return records from other partitions since maxRecords is 
maxInt?
{quote}

and:

{quote}
Is [the check for the \{{fetchedRecord}} size of 3] redundant given the test 
[two lines above]?
{quote}


> Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
> 
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> As part of the review for [pull request 
> #14406|https://github.com/apache/kafka/pull/14406], [~junrao] made these 
> comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
> {quote}Hmm, why don't we return records from other partitions since 
> maxRecords is maxInt?
> {quote}
> and:
> {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the 
> test [two lines above]?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Description: 
As part of the review for [pull request 
#14406|[https://github.com/apache/kafka/pull/14406]], [~junrao] made these 
comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the 
test [two lines above]?
{quote}

  was:
As part of the review for #14406, [~junrao] made these comments on the 
FetcherTest.testCompletedFetchRemoval() test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the `fetchedRecord` size of 3] redundant given the 
test [two lines above]?
{quote}


> Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()
> 
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> As part of the review for [pull request 
> #14406|[https://github.com/apache/kafka/pull/14406]], [~junrao] made these 
> comments on the {{FetcherTest.testCompletedFetchRemoval()}} test:
> {quote}Hmm, why don't we return records from other partitions since 
> maxRecords is maxInt?
> {quote}
> and:
> {quote}Is [the check for the {{fetchedRecord}} size of 3] redundant given the 
> test [two lines above]?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]

2023-10-13 Thread via GitHub


ocadaruma commented on code in PR #14546:
URL: https://github.com/apache/kafka/pull/14546#discussion_r1358951019


##
docs/ops.html:
##
@@ -3736,7 +3736,7 @@ ZooKeeper to KRaft 
Migration
 
   
-ZooKeeper to KRaft migration is considered an Early Access feature and 
is not recommended for production clusters.
+Limitations:
   
 
   The following features are not yet supported for ZK to KRaft 
migrations:

Review Comment:
   I also wonder if `Downgrading to ZooKeeper` is not possible yet or already 
supported.
   
   Assuming all features of 
[KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration)
 are covered, rollback is possible until we exit migration mode right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15606) Verify & refactor correctness of FetcherTest.testCompletedFetchRemoval()

2023-10-13 Thread Kirk True (Jira)
Kirk True created KAFKA-15606:
-

 Summary: Verify & refactor correctness of 
FetcherTest.testCompletedFetchRemoval()
 Key: KAFKA-15606
 URL: https://issues.apache.org/jira/browse/KAFKA-15606
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


As part of the review for #14406, [~junrao] made these comments on the 
FetcherTest.testCompletedFetchRemoval() test:
{quote}Hmm, why don't we return records from other partitions since maxRecords 
is maxInt?
{quote}
and:
{quote}Is [the check for the `fetchedRecord` size of 3] redundant given the 
test [two lines above]?
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Zk to KRaft migration is now production ready [kafka]

2023-10-13 Thread via GitHub


ocadaruma opened a new pull request, #14546:
URL: https://github.com/apache/kafka/pull/14546

   - As of 3.6.0, ZK to KRaft migration is considered production ready so 
updating the doc
 * https://kafka.apache.org/blog#apache_kafka_360_release_announcement
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358949767


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-13 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775087#comment-17775087
 ] 

Philip Nee commented on KAFKA-15602:


Hi Luke, i think we need to do two things here. One is probably revert the 
change and patch it with better documentation and tests.  Second is optional, 
if you want to introduce a separated serializer to handle offset, I think we 
might need to write a KIP.  This is a long term solution.  Are you interested 
in helping out to resolve this issue?  I can help reviewing the code and KIP.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358918492


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358914547


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358914146


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Commented] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.3

2023-10-13 Thread Said BOUDJELDA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775070#comment-17775070
 ] 

Said BOUDJELDA commented on KAFKA-15208:


This JIRA is updated to mention an upgrade to version 1.15.3 that comes to 
bring more bug fixing and enhancements 

Pleass to the last release notes 
https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3 for more 
information 


> Upgrade Jackson dependencies to version 2.15.3
> --
>
> Key: KAFKA-15208
> URL: https://issues.apache.org/jira/browse/KAFKA-15208
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, kraft
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Major
>  Labels: dependencies
> Fix For: 3.7.0
>
>
> Upgrading the version of Jackson dependencies to the latest stable version 
> 2.15.2 can bring much bug fixing security issues solving and performance 
> improvement 
>  
> Check release notes back to the current version 
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.3

2023-10-13 Thread Said BOUDJELDA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Said BOUDJELDA updated KAFKA-15208:
---
Description: 
Upgrading the version of Jackson dependencies to the latest stable version 
2.15.2 can bring much bug fixing security issues solving and performance 
improvement 

 

Check release notes back to the current version 

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3]

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]

 

 

  was:
Upgrading the version of Jackson dependencies to the latest stable version 
2.15.2 can bring much bug fixing security issues solving and performance 
improvement 

 

Check release notes back to the current version 

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2]

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]

 

 


> Upgrade Jackson dependencies to version 2.15.3
> --
>
> Key: KAFKA-15208
> URL: https://issues.apache.org/jira/browse/KAFKA-15208
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, kraft
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Major
>  Labels: dependencies
> Fix For: 3.7.0
>
>
> Upgrading the version of Jackson dependencies to the latest stable version 
> 2.15.2 can bring much bug fixing security issues solving and performance 
> improvement 
>  
> Check release notes back to the current version 
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.3

2023-10-13 Thread Said BOUDJELDA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Said BOUDJELDA updated KAFKA-15208:
---
Summary: Upgrade Jackson dependencies to version 2.15.3  (was: Upgrade 
Jackson dependencies to version 2.15.2)

> Upgrade Jackson dependencies to version 2.15.3
> --
>
> Key: KAFKA-15208
> URL: https://issues.apache.org/jira/browse/KAFKA-15208
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, kraft
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Major
>  Labels: dependencies
> Fix For: 3.7.0
>
>
> Upgrading the version of Jackson dependencies to the latest stable version 
> 2.15.2 can bring much bug fixing security issues solving and performance 
> improvement 
>  
> Check release notes back to the current version 
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.15.3 [kafka]

2023-10-13 Thread via GitHub


bmscomp commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1762253746

   The pull request is updated to take advantages of version 2.15.3 of `jackson 
databind ` that aims to fix much more issues and bring more enhancements, as we 
can mention : 
   
   [#3968](https://github.com/FasterXML/jackson-databind/issues/3968): Records 
with additional constructors failed to deserialize
   [#4121](https://github.com/FasterXML/jackson-databind/issues/4121): Preserve 
the original component type in merging to an array
   [#426](https://github.com/FasterXML/jackson-dataformats-text/issues/426): 
Update to SnakeYAML 2.1
   
   Nothing changed related to the limitations make on this version of the field 
serialisation of json to solve security issues 
   
To know more about this version please refer to the release notes  
https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.3
   
   @divijvaidya  Please can you take a look at this update
   
   Thanks 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-13 Thread Luke Kirby (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775069#comment-17775069
 ] 

Luke Kirby commented on KAFKA-15602:


{quote}The original intent of the PR was to make the serializer compatible with 
user offsets; I don't see any test that covers this, am I missing anything?
{quote}
Yep, no test that covers it. I wrote up the kind of test you'd use to assess it 
above and it fails. I think the actual intent of the PR writer was to handle 
the case that they noticed (incorrectly, I contend) and the offsets question 
just kind of got accidentally ignored. That said, I'm more concerned about the 
breaking behavior change more than the specifics of handling offsets, which 
neither version has ever done properly.

 
{quote}We are talking about creating a separated serializer that handles the 
user offset aren't we? It seems to be the only way to handle this case.
{quote}
 
Yeah, I think so. Again, though, I think something has to be done to advertise 
to users that the contract here has changed and that upgrades to 3.4 will more 
likely than not result in previously functional producers emitting 0-length 
messages if anything other than a ByteBuffer.wrap(byte[]) message is provided 
to send(), and that removing the existing class may be the only way to really 
achieve that safely – i.e., break loudly. Totally not my call though!

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing 

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-13 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775065#comment-17775065
 ] 

Philip Nee commented on KAFKA-15602:


The original intent of the PR was to make the serializer compatible with user 
offsets; I don't see any test that covers this, am I missing anything?

 

replace the currently named ByteBufferSerializer with a new one that respects 
the standard interface unambiguously

- We are talking about creating a separated serializer that handles the user 
offset aren't we? It seems to be the only way to handle this case.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it 

Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358837845


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   produce requests do not include a leader epoch => they can never get fenced 
leader epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358837039


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   my point in the previous comment is that will never be the case with KRaft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358826931


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   ok, removed FENCED_LEADER_EPOCH from the produce path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358796082


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   I think the error is only returned on fetch requests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Identify clean shutdown broker [kafka]

2023-10-13 Thread via GitHub


junrao commented on code in PR #14465:
URL: https://github.com/apache/kafka/pull/14465#discussion_r1358649732


##
core/src/main/scala/kafka/log/CleanShutdownFileHandler.scala:
##
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import kafka.utils.Logging
+
+import java.io.File
+import java.io.FileOutputStream
+import java.io.BufferedWriter
+import java.io.IOException
+import java.io.OutputStreamWriter
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util.regex.Pattern
+
+/**
+ * Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 
and higher.
+ * This is used to avoid unnecessary recovery after a clean shutdown. In 
theory this could be
+ * avoided by passing in the recovery point, however finding the correct 
position to do this
+ * requires accessing the offset index which may not be safe in an unclean 
shutdown.
+ * For more information see the discussion in PR#2104
+ *
+ * Also, the clean shutdown file can also store the broker epoch, this can be 
used in the broker registration to
+ * demonstrate the last reboot is a clean shutdown. (KIP-966)
+ */
+
+object CleanShutdownFileHandler {
+  val CleanShutdownFileName = ".kafka_cleanshutdown"
+}
+
+class CleanShutdownFileHandler(dirPath: String) extends Logging {
+  val cleanShutdownFile = new File(dirPath, 
CleanShutdownFileHandler.CleanShutdownFileName)
+  val currentVersion = 0
+
+  @throws[Exception]
+  def write(brokerEpoch: Long): Unit = {
+write(brokerEpoch, currentVersion)
+  }
+
+  // visible to test.
+  @throws[Exception]
+  def write(brokerEpoch: Long, version: Int): Unit = {
+val os = new FileOutputStream(cleanShutdownFile)
+val bw = new BufferedWriter(new OutputStreamWriter(os, 
StandardCharsets.UTF_8))
+try {
+  bw.write("version: " + version + '\n')
+  bw.write("brokerEpoch: " + brokerEpoch)
+  bw.flush()
+  os.getFD.sync()
+} finally bw.close()
+  }
+
+  @throws[Exception]
+  def read: Long = {
+val br = Files.newBufferedReader(cleanShutdownFile.toPath, 
StandardCharsets.UTF_8)
+val whiteSpacesPattern = Pattern.compile(":\\s+")
+var brokerEpoch = -1L
+try {
+  val versionString = br.readLine
+  val versionArray = whiteSpacesPattern.split(versionString)
+  if (versionArray.length != 2) {
+throwIOException("can't parse version from \"" + versionString + "\"")
+  }
+  if (versionArray(1).toInt != currentVersion) {

Review Comment:
   In the future, we may evolve the file format by adding new fields and 
bumping up the version. To make downgrading possible, it might be useful to 
relax this kind of check on version.



##
core/src/test/scala/unit/kafka/log/CleanShutdownFileHandlerTest.scala:
##
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.log

Review Comment:
   no need for unit.



##
core/src/main/scala/kafka/log/CleanShutdownFileHandler.scala:
##
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358791361


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2023-10-13 Thread via GitHub


ashmeet13 commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-1762139883

   Got it! I'll make this change - for now I have gone through the code and the 
following two references and compiled a list of configs that are somehow 
"controlled" by KS. For now sharing the Producer Configs here and soon Consumer 
Configs too
   
   **Producer Configs with EoS Disabled**
   ```
   1. [Editable] [CustomDefault] linger.ms = 100
   2. [Fixed] partitioner.class = StreamsPartitioner
   ```
   
   **Producer Configs with EoS Disabled**
   ```
   1. [Editable] [CustomDefault] linger.ms = 100
   2. [Fixed] partitioner.class = StreamsPartitioner
   3. [Fixed] enable.idempotence = true
   4. [Validate] max.in.flight.requests.per.connection <= 5
   5. [Fixed] [NoDefault] transactional.id = -
   6. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
   7. [Editable] [CustomDefault] transaction.timeout.ms = 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358773873


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   will move this into the if block.
   
   why can't produce receive FENCED_LEADER_EPOCH?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358773301


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   will update the listener name.
   
   I think the motivation for checking replica manager first is it may be 
faster than metadata cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-13 Thread via GitHub


rbaddam commented on PR #14491:
URL: https://github.com/apache/kafka/pull/14491#issuecomment-1762132178

   Thank you @omkreddy for the review and feedback.
   
   I have updated the 
[KIP-982](https://cwiki.apache.org/confluence/display/KAFKA/KIP-982%3A+Access+SslPrincipalMapper+and+kerberosShortNamer+in+Custom+KafkaPrincipalBuilder)
 with the latest changes from the pull request. 
   
   Could you please review the changes when you have a moment? Your feedback is 
highly appreciated.
   
   Thanks,
   Raghu Baddam


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358764452


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358761112


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358758478


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1107,7 +1107,7 @@ public void testFetchMaxPollRecords() {
 subscriptions.seek(tp0, 1);
 
 client.prepareResponse(matchesOffset(tidp0, 1), 
fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
-client.prepareResponse(matchesOffset(tidp0, 4), 
fullFetchResponse(tidp0, this.nextRecords, Errors.NONE, 100L, 0));
+client.prepareResponse(matchesOffset(tidp0, 4), 
fullFetchResponse(tidp0, nextRecords, Errors.NONE, 100L, 0));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15605 Handle pending topic deletions during migration [kafka]

2023-10-13 Thread via GitHub


mumrah opened a new pull request, #14545:
URL: https://github.com/apache/kafka/pull/14545

   After migrating metadata to KRaft, we need to finalize the pending topic 
deletions. Since the brokers need the StopReplica request in order to clean up 
their log directories, we can't simply exclude the pending topics from the 
migration. This patch continues to migrate the pending topics, but adds logic 
to delete the topic from KRaft shortly after the migration. This lets us reuse 
the existing dual-write logic for topic deletions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-13 Thread Luke Kirby (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775037#comment-17775037
 ] 

Luke Kirby commented on KAFKA-15602:


{quote}The original documentation mentioned the user "{_}Do not need to 
flip{_}", I assume that means the serializer will automatically rewind the 
position to zero for the user as flip explicitly stated ".{_}..limit is set to 
the current position and then the position is set to zero{_}".  However, the 
serializer API didn't make any assumptions to the current position of the 
buffer, so it is hard to say if the original design intended to handle a 
user-given offset... From reading the unit test, it doesn't seem so.
{quote}
Ah, no there was no documentation on the serializer before; the "Do not need to 
flip" comment was introduced with the breaking change in 3.4, though it's 
pretty plain to see that previous users of the serializer likely _were_ 
flipping, or, at least, writing buffers (with content starting at position 0), 
flipping them, and sending them _worked_ but now doesn't.

Looking at the history for the file, it does look like prior to the 3.4 change 
it always did a rewind() so yeah, position() on the buffer has never really 
been respected. Now, it kind of _does_ respect position; but not in the 
expected way, as it essentially identifies the end of the buffer rather than 
the start.

I don't know why the original version used a rewind() rather than expecting the 
buffer to already have a position() reflecting the start point to read from, 
since in my experience this is clearly the convention for passing ByteBuffer 
inputs for reading in any library I'm familiar with, and I expect the 
expectation of most users; perhaps I'm ignorant of some other pattern! I don't 
know if there's some valid contextual reason for it using explicit rewinds() 
before, but it seems unusual. My suggestion would be to replace the currently 
named ByteBufferSerializer with a new one that respects the standard interface 
unambiguously – i.e., it serializes data from buffer.pos() to buffer.limit(), 
continuing to use appropriate shortcuts for the simple wrap(byte[]) cases.
{quote}I wonder if the sensible thing to do for now is to is to make the 
contract and expectations more explicit on the Javadoc, i.e., the serialize 
method doesn't support user-provided offset, and the serializer will rewind the 
current offset to zero for the user.
{quote}
Explicit documentation would surely be improvement, and is probably at the core 
of how this situation was arrived at, but it doesn't do anything about folk 
being surprised by the completely different behavior and broken output that 
worked from 0.10 - 3.3.2 when upgrading to 3.4.0... though hopefully they 
notice it. 

 

 

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails 

[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft

2023-10-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15605:
-
Summary: Topics marked for deletion in ZK are incorrectly migrated to KRaft 
 (was: Topic marked for deletion are incorrectly migrated to KRaft)

> Topics marked for deletion in ZK are incorrectly migrated to KRaft
> --
>
> Key: KAFKA-15605
> URL: https://issues.apache.org/jira/browse/KAFKA-15605
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Priority: Major
> Fix For: 3.6.1
>
>
> When migrating topics from ZooKeeper, the KRaft controller reads all the 
> topic and partition metadata from ZK directly. This includes topics which 
> have been marked for deletion by the ZK controller. 
> Since the client request to delete these topics has already been returned as 
> successful, it would be confusing to the client that the topic still existed. 
> An operator or application would need to issue another topic deletion to 
> remove these topics once the controller had moved to KRaft. If they tried to 
> create a new topic with the same name, they would receive a 
> TOPIC_ALREADY_EXISTS error.
> The migration logic should carry over pending topic deletions and resolve 
> them either as part of the migration or shortly after.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft

2023-10-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15605:
-
Description: 
When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This includes topics which have been 
marked for deletion by the ZK controller. After being migrated to KRaft, the 
pending topic deletions are never completed, so it is as if the delete topic 
request never happened.

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.

  was:
When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This includes topics which have been 
marked for deletion by the ZK controller. 

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.


> Topics marked for deletion in ZK are incorrectly migrated to KRaft
> --
>
> Key: KAFKA-15605
> URL: https://issues.apache.org/jira/browse/KAFKA-15605
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Priority: Major
> Fix For: 3.6.1
>
>
> When migrating topics from ZooKeeper, the KRaft controller reads all the 
> topic and partition metadata from ZK directly. This includes topics which 
> have been marked for deletion by the ZK controller. After being migrated to 
> KRaft, the pending topic deletions are never completed, so it is as if the 
> delete topic request never happened.
> Since the client request to delete these topics has already been returned as 
> successful, it would be confusing to the client that the topic still existed. 
> An operator or application would need to issue another topic deletion to 
> remove these topics once the controller had moved to KRaft. If they tried to 
> create a new topic with the same name, they would receive a 
> TOPIC_ALREADY_EXISTS error.
> The migration logic should carry over pending topic deletions and resolve 
> them either as part of the migration or shortly after.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft

2023-10-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15605:
-
Description: 
When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This includes topics which have been 
marked for deletion by the ZK controller. 

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.

  was:
When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This incorrectly includes topics which 
have been marked for deletion by the ZK controller. 

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.


> Topics marked for deletion in ZK are incorrectly migrated to KRaft
> --
>
> Key: KAFKA-15605
> URL: https://issues.apache.org/jira/browse/KAFKA-15605
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Priority: Major
> Fix For: 3.6.1
>
>
> When migrating topics from ZooKeeper, the KRaft controller reads all the 
> topic and partition metadata from ZK directly. This includes topics which 
> have been marked for deletion by the ZK controller. 
> Since the client request to delete these topics has already been returned as 
> successful, it would be confusing to the client that the topic still existed. 
> An operator or application would need to issue another topic deletion to 
> remove these topics once the controller had moved to KRaft. If they tried to 
> create a new topic with the same name, they would receive a 
> TOPIC_ALREADY_EXISTS error.
> The migration logic should carry over pending topic deletions and resolve 
> them either as part of the migration or shortly after.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft

2023-10-13 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15605:
-
Description: 
When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This incorrectly includes topics which 
have been marked for deletion by the ZK controller. 

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.

  was:
When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This includes topics which have been 
marked for deletion by the ZK controller. 

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.


> Topics marked for deletion in ZK are incorrectly migrated to KRaft
> --
>
> Key: KAFKA-15605
> URL: https://issues.apache.org/jira/browse/KAFKA-15605
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Priority: Major
> Fix For: 3.6.1
>
>
> When migrating topics from ZooKeeper, the KRaft controller reads all the 
> topic and partition metadata from ZK directly. This incorrectly includes 
> topics which have been marked for deletion by the ZK controller. 
> Since the client request to delete these topics has already been returned as 
> successful, it would be confusing to the client that the topic still existed. 
> An operator or application would need to issue another topic deletion to 
> remove these topics once the controller had moved to KRaft. If they tried to 
> create a new topic with the same name, they would receive a 
> TOPIC_ALREADY_EXISTS error.
> The migration logic should carry over pending topic deletions and resolve 
> them either as part of the migration or shortly after.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15605) Topic marked for deletion are incorrectly migrated to KRaft

2023-10-13 Thread David Arthur (Jira)
David Arthur created KAFKA-15605:


 Summary: Topic marked for deletion are incorrectly migrated to 
KRaft
 Key: KAFKA-15605
 URL: https://issues.apache.org/jira/browse/KAFKA-15605
 Project: Kafka
  Issue Type: Bug
  Components: controller, kraft
Affects Versions: 3.6.0
Reporter: David Arthur
 Fix For: 3.6.1


When migrating topics from ZooKeeper, the KRaft controller reads all the topic 
and partition metadata from ZK directly. This includes topics which have been 
marked for deletion by the ZK controller. 

Since the client request to delete these topics has already been returned as 
successful, it would be confusing to the client that the topic still existed. 
An operator or application would need to issue another topic deletion to remove 
these topics once the controller had moved to KRaft. If they tried to create a 
new topic with the same name, they would receive a TOPIC_ALREADY_EXISTS error.

The migration logic should carry over pending topic deletions and resolve them 
either as part of the migration or shortly after.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration

2023-10-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775034#comment-17775034
 ] 

ASF GitHub Bot commented on KAFKA-15552:


ijuma commented on code in PR #560:
URL: https://github.com/apache/kafka-site/pull/560#discussion_r1358724223


##
36/upgrade.html:
##
@@ -84,7 +84,9 @@ Upgrading KRaft-based cl
 
 Notable 
changes in 3.6.0
 
-Apache Kafka now supports having both an IPv4 and an IPv6 listener 
on the same port. This change only applies to
+ZooKeeper to KRaft migrations are now recommended for production 
usage. One significant issue was found in the
+ 3.6.0 release which affects transactional producers 
https://issues.apache.org/jira/browse/KAFKA-15552.

Review Comment:
   And can we be clearer about what we're recommending? Are we saying that they 
should wait for 3.6.1 if they use idempotent producers (the default since 3.0) 
or transactions?





> Duplicate Producer ID blocks during ZK migration
> 
>
> Key: KAFKA-15552
> URL: https://issues.apache.org/jira/browse/KAFKA-15552
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.6.1
>
>
> When migrating producer ID blocks from ZK to KRaft, we are taking the current 
> producer ID block from ZK and writing it's "firstProducerId" into the 
> producer IDs KRaft record. However, in KRaft we store the _next_ producer ID 
> block in the log rather than storing the current block like ZK does. The end 
> result is that the first block given to a caller of AllocateProducerIds is a 
> duplicate of the last block allocated in ZK mode.
>  
> This can result in duplicate producer IDs being given to transactional or 
> idempotent producers. In the case of transactional producers, this can cause 
> long term problems since the producer IDs are persisted and reused for a long 
> time.
> The time between the last producer ID block being allocated by the ZK 
> controller and all the brokers being restarted following the metadata 
> migration is when this bug is possible.
>  
> Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException 
> and possibly some producer epoch validation errors. To see if a cluster is 
> affected by this bug, search for the offending producer ID and see if it is 
> being used by more than one producer.
>  
> For example, the following error was observed
> {code}
> Out of order sequence number for producer 376000 at offset 381338 in 
> partition REDACTED: 0 (incoming seq. number), 21 (current end sequence 
> number) 
> {code}
> Then searching for "376000" on 
> org.apache.kafka.clients.producer.internals.TransactionManager logs, two 
> brokers both show the same producer ID being provisioned
> {code}
> Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1
> Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358717315


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358713529


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358712167


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration

2023-10-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775032#comment-17775032
 ] 

ASF GitHub Bot commented on KAFKA-15552:


jolshan commented on code in PR #560:
URL: https://github.com/apache/kafka-site/pull/560#discussion_r1358710418


##
36/upgrade.html:
##
@@ -84,7 +84,9 @@ Upgrading KRaft-based cl
 
 Notable 
changes in 3.6.0
 
-Apache Kafka now supports having both an IPv4 and an IPv6 listener 
on the same port. This change only applies to
+ZooKeeper to KRaft migrations are now recommended for production 
usage. One significant issue was found in the
+ 3.6.0 release which affects transactional producers 
https://issues.apache.org/jira/browse/KAFKA-15552.

Review Comment:
   we should clarify this is idempotent producers as well.





> Duplicate Producer ID blocks during ZK migration
> 
>
> Key: KAFKA-15552
> URL: https://issues.apache.org/jira/browse/KAFKA-15552
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.6.1
>
>
> When migrating producer ID blocks from ZK to KRaft, we are taking the current 
> producer ID block from ZK and writing it's "firstProducerId" into the 
> producer IDs KRaft record. However, in KRaft we store the _next_ producer ID 
> block in the log rather than storing the current block like ZK does. The end 
> result is that the first block given to a caller of AllocateProducerIds is a 
> duplicate of the last block allocated in ZK mode.
>  
> This can result in duplicate producer IDs being given to transactional or 
> idempotent producers. In the case of transactional producers, this can cause 
> long term problems since the producer IDs are persisted and reused for a long 
> time.
> The time between the last producer ID block being allocated by the ZK 
> controller and all the brokers being restarted following the metadata 
> migration is when this bug is possible.
>  
> Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException 
> and possibly some producer epoch validation errors. To see if a cluster is 
> affected by this bug, search for the offending producer ID and see if it is 
> being used by more than one producer.
>  
> For example, the following error was observed
> {code}
> Out of order sequence number for producer 376000 at offset 381338 in 
> partition REDACTED: 0 (incoming seq. number), 21 (current end sequence 
> number) 
> {code}
> Then searching for "376000" on 
> org.apache.kafka.clients.producer.internals.TransactionManager logs, two 
> brokers both show the same producer ID being provisioned
> {code}
> Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1
> Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration

2023-10-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775029#comment-17775029
 ] 

ASF GitHub Bot commented on KAFKA-15552:


mumrah opened a new pull request, #560:
URL: https://github.com/apache/kafka-site/pull/560

   (no comment)




> Duplicate Producer ID blocks during ZK migration
> 
>
> Key: KAFKA-15552
> URL: https://issues.apache.org/jira/browse/KAFKA-15552
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.6.1
>
>
> When migrating producer ID blocks from ZK to KRaft, we are taking the current 
> producer ID block from ZK and writing it's "firstProducerId" into the 
> producer IDs KRaft record. However, in KRaft we store the _next_ producer ID 
> block in the log rather than storing the current block like ZK does. The end 
> result is that the first block given to a caller of AllocateProducerIds is a 
> duplicate of the last block allocated in ZK mode.
>  
> This can result in duplicate producer IDs being given to transactional or 
> idempotent producers. In the case of transactional producers, this can cause 
> long term problems since the producer IDs are persisted and reused for a long 
> time.
> The time between the last producer ID block being allocated by the ZK 
> controller and all the brokers being restarted following the metadata 
> migration is when this bug is possible.
>  
> Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException 
> and possibly some producer epoch validation errors. To see if a cluster is 
> affected by this bug, search for the offending producer ID and see if it is 
> being used by more than one producer.
>  
> For example, the following error was observed
> {code}
> Out of order sequence number for producer 376000 at offset 381338 in 
> partition REDACTED: 0 (incoming seq. number), 21 (current end sequence 
> number) 
> {code}
> Then searching for "376000" on 
> org.apache.kafka.clients.producer.internals.TransactionManager logs, two 
> brokers both show the same producer ID being provisioned
> {code}
> Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1
> Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358701620


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358693714


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-13 Thread via GitHub


omkreddy commented on PR #14491:
URL: https://github.com/apache/kafka/pull/14491#issuecomment-1762024387

   @rbaddam Thanks for the PR. Can you please update the KIP with newly added 
interfaces also. I will review the PR once KIP passes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


ijuma commented on PR #14529:
URL: https://github.com/apache/kafka/pull/14529#issuecomment-1762008506

   > @ijuma Thanks for the PR. I will review it tomorrow.
   
   Thanks @satishd. I won't merge until Tuesday to give you a chance to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358682245


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r135839


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+private static final Timer LOG_FLUSH_TIMER;
+
+static {
+KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+@Override
+public MetricName metricName(String name, Map 
tags) {
+// Override the group and type names for compatibility
+return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+}
+};
+LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+}
+
+private final FileRecords log;
+private final LazyIndex lazyOffsetIndex;
+private final LazyIndex lazyTimeIndex;
+private final TransactionIndex txnIndex;
+private final long baseOffset;
+private final int indexIntervalBytes;
+private final long rollJitterMs;
+private final Time time;
+
+// The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+// volatile for LogCleaner to see the update
+private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+/* The maximum timestamp and offset we see so far */
+private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+private long created;
+
+/* the number of bytes since we last added an entry in the offset index */
+private int bytesSinceLastIndexEntry = 0;
+
+/**
+ * Create a LogSegment with the provided parameters.
+ *
+ * @param log The file records containing log entries
+ * @param lazyOffsetIndex The offset index
+ * @param lazyTimeIndex The timestamp index
+ * @param txnIndex The transaction index
+ * @param baseOffset A lower bound on the offsets in this segment
+ * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+ * 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358665833


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


ijuma commented on PR #14529:
URL: https://github.com/apache/kafka/pull/14529#issuecomment-1761985192

   > Thank you for patiently answering my comments
   
   Thanks for the review and for paying close attention to code quality 
(modularity, readability, etc.) - it's an important area!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358658187


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1761979623

   Hi @cadonna - Thank you for putting time into this PR, very much 
appreciated. I responded to some of your questions, let me know if there is 
still any ambiguity left.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1358656536


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   I think the only time we invoke onComplete() is when a response is available 
(see below). If the response is `null`, which indicates the request has not 
been sent out; therefore, in the `NetworkClientDelegate`, we need to actively 
fail the request on timeout.
   
   onComplete when response is available:
   ```
   private void completeResponses(List responses) {
   for (ClientResponse response : responses) {
   try {
   response.onComplete();
   } catch (Exception e) {
   log.error("Uncaught error in request completion:", e);
   }
   }
   }
   ```
   
   We actively expires the unsent request and fail them with TimeoutException:
   ```
   if (unsent.timer.isExpired()) {
   iterator.remove();
   unsent.handler.onFailure(currentTimeMs, new TimeoutException(
   "Failed to send request after " + 
unsent.timer.timeoutMs() + " ms."));
   continue;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14767: Fix missing commitId build error after git gc [kafka]

2023-10-13 Thread via GitHub


ijuma commented on PR #13315:
URL: https://github.com/apache/kafka/pull/13315#issuecomment-1761974609

   Well, Jenkins failed - we need to firs that first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-13 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358651225


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1358640330


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -254,28 +250,38 @@ public String toString() {
 }
 }
 
-public static class FutureCompletionHandler implements 
RequestCompletionHandler {
+public static class FutureCompletionHandler extends 
CompletableFuture implements RequestCompletionHandler {
 
-private final CompletableFuture future;
+/**
+ * The time when the response is completed. This is used when the 
response is completed exceptionally because
+ * ClientResponse already contains received time which is injected by 
the network client.
+ */

Review Comment:
   Thanks, I think I was trying to say the main use of responseCompletionTimeMs 
is mainly used when the future gets completed exceptionally.  For the 
successful requests, we could use either the ClientResponse or this time.  I'll 
remove this comment to avoid the ambiguity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1358637014


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs,
 
 private void retry(final long currentTimeMs) {
 onFailedAttempt(currentTimeMs);
-onSendAttempt(currentTimeMs);

Review Comment:
   This is duplicated because the manager only sends out the request when it is 
being polled via the `poll()` method, which `drain()` the unsent requests from 
the queue.  In the `drain()` it already invokes the `onSentAttempt()`
   
   See
   ```
   for (OffsetFetchRequestState request : partitionedBySendability.get(true)) {
   request.onSendAttempt(currentTimeMs);
   unsentRequests.add(request.toUnsentRequest());
   inflightOffsetFetches.add(request);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14767: Fix missing commitId build error after git gc [kafka]

2023-10-13 Thread via GitHub


gharris1727 commented on PR #13315:
URL: https://github.com/apache/kafka/pull/13315#issuecomment-1761944410

   Hey @ijuma @divijvaidya @C0urante or @yashmayya Could you review this build 
improvement?
   
   I'm still getting this `git gc` failure regularly when I try to build 
branches that have been idle for some time.
   This also includes an improvement (the `isDirectory` guard) which allows us 
to use `git worktree` to check out and build multiple branches at one time.
   
   I'd like to backport this to 3.4, 3.5, and 3.6.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-13 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774807#comment-17774807
 ] 

Philip Nee edited comment on KAFKA-15602 at 10/13/23 5:43 PM:
--

Hi [~luke.kirby] ,

Thanks for reporting this.  I briefly went over the code a bit and I can now 
understand your concern.

The original documentation mentioned the user "{_}Do not need to flip{_}", I 
assume that means the serializer will automatically rewind the position to zero 
for the user as flip explicitly stated ".{_}..limit is set to the current 
position and then the position is set to zero{_}".  However, the serializer API 
didn't make any assumptions to the current position of the buffer, so it is 
hard to say if the original design intended to handle a user-given offset... 
From reading the unit test, it doesn't seem so.

I can see the problem comes from the serializer doesn't know if the position is 
an offset or just the next byte to be written.  These are two different 
definitions of the position so it doesn't really make sense to handle both 
cases in a single API call.

I wonder if the sensible thing to do for now is to is to make the contract and 
expectations more explicit on the Javadoc, i.e., the serialize method doesn't 
support user-provided offset, and the serializer will rewind the current offset 
to zero for the user.

[~guozhang] - Would you kindly provide some input on this issue? As you 
reviewed the KAFKA-4852. 

Thanks!

P


was (Author: JIRAUSER283568):
Hi [~luke.kirby] ,

Thanks for reporting this.  I briefly went over the code a bit and I can now 
understand your concern.

The original documentation mentioned the user "{_}Do not need to flip{_}", I 
assume that means the serializer will automatically rewind the position to zero 
for the user as flip explicitly stated ".{_}..limit is set to the current 
position and then the position is set to zero{_}".  However, the serializer API 
didn't make any assumptions to the current position of the buffer, so it is 
hard to say if the original design intended to handle a user-given offset... 
From reading the unit test, it doesn't seem so.

I can see the problem comes from the serializer doesn't know if the position is 
an offset or just the next byte to be written.  These are two different 
definitions of the position so it doesn't really make sense to handle both 
cases in a single API call.

I wonder if we could do the following:

If the user doesn't want to provide an offset, which is the most common use 
case, the user may continue using the existing API.

If the user wraps the buffer with an offset, we might want to provide another 
API to specify the buffer, so that we can rewind the position to the correct 
position.

[~guozhang] - Would you kindly provide some input on this issue? As you 
reviewed the KAFKA-4852. 

Thanks!

P

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, 

Re: [PR] MINOR: Debug EmbeddedConnect/KafkaCluster failures [kafka]

2023-10-13 Thread via GitHub


gharris1727 closed pull request #13580: MINOR: Debug 
EmbeddedConnect/KafkaCluster failures
URL: https://github.com/apache/kafka/pull/13580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-13 Thread via GitHub


gharris1727 commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1358597074


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not 
be null
+ * @return the current level (falling back on the effective level if 
necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+public synchronized LoggerLevel level(String logger) {

Review Comment:
   @yangy Writes must have an exclusive lock for thread safety, meaning 
that no concurrent reads are allowed while a write is happening. The 
synchronized lock here makes sure that no writes are ongoing. It's a bit 
stronger than necessary: multiple concurrent readers could be allowed, but 
multiple readers sharing a lock is a little bit more complex, and probably not 
worth the performance difference. These methods are not called in any hotpath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1358592259


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+private static final Timer LOG_FLUSH_TIMER;
+
+static {
+KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+@Override
+public MetricName metricName(String name, Map 
tags) {
+// Override the group and type names for compatibility
+return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+}
+};
+LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+}
+
+private final FileRecords log;
+private final LazyIndex lazyOffsetIndex;
+private final LazyIndex lazyTimeIndex;
+private final TransactionIndex txnIndex;
+private final long baseOffset;
+private final int indexIntervalBytes;
+private final long rollJitterMs;
+private final Time time;
+
+// The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+// volatile for LogCleaner to see the update
+private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+/* The maximum timestamp and offset we see so far */
+private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+private long created;
+
+/* the number of bytes since we last added an entry in the offset index */
+private int bytesSinceLastIndexEntry = 0;
+
+/**
+ * Create a LogSegment with the provided parameters.
+ *
+ * @param log The file records containing log entries
+ * @param lazyOffsetIndex The offset index
+ * @param lazyTimeIndex The timestamp index
+ * @param txnIndex The transaction index
+ * @param baseOffset A lower bound on the offsets in this segment
+ * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+ * 

Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1358592259


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+private static final Timer LOG_FLUSH_TIMER;
+
+static {
+KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+@Override
+public MetricName metricName(String name, Map 
tags) {
+// Override the group and type names for compatibility
+return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+}
+};
+LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+}
+
+private final FileRecords log;
+private final LazyIndex lazyOffsetIndex;
+private final LazyIndex lazyTimeIndex;
+private final TransactionIndex txnIndex;
+private final long baseOffset;
+private final int indexIntervalBytes;
+private final long rollJitterMs;
+private final Time time;
+
+// The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+// volatile for LogCleaner to see the update
+private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+/* The maximum timestamp and offset we see so far */
+private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+private long created;
+
+/* the number of bytes since we last added an entry in the offset index */
+private int bytesSinceLastIndexEntry = 0;
+
+/**
+ * Create a LogSegment with the provided parameters.
+ *
+ * @param log The file records containing log entries
+ * @param lazyOffsetIndex The offset index
+ * @param lazyTimeIndex The timestamp index
+ * @param txnIndex The transaction index
+ * @param baseOffset A lower bound on the offsets in this segment
+ * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+ * 

Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


divijvaidya commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1358569570


##
clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java:
##
@@ -44,6 +45,17 @@ public RecordBatch firstBatch() {
 return iterator.next();
 }
 
+@Override
+public Optional lastBatch() {
+Iterator iterator = batches().iterator();
+
+RecordBatch batch = null;
+while (iterator.hasNext())
+batch = iterator.next();
+
+return Optional.ofNullable(batch);

Review Comment:
   Never mind. My misunderstanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


divijvaidya commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1358566629


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+private static final Timer LOG_FLUSH_TIMER;
+
+static {
+KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+@Override
+public MetricName metricName(String name, Map 
tags) {
+// Override the group and type names for compatibility
+return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+}
+};
+LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+}
+
+private final FileRecords log;
+private final LazyIndex lazyOffsetIndex;
+private final LazyIndex lazyTimeIndex;
+private final TransactionIndex txnIndex;
+private final long baseOffset;
+private final int indexIntervalBytes;
+private final long rollJitterMs;
+private final Time time;
+
+// The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+// volatile for LogCleaner to see the update
+private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+/* The maximum timestamp and offset we see so far */
+private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+private long created;
+
+/* the number of bytes since we last added an entry in the offset index */
+private int bytesSinceLastIndexEntry = 0;
+
+/**
+ * Create a LogSegment with the provided parameters.
+ *
+ * @param log The file records containing log entries
+ * @param lazyOffsetIndex The offset index
+ * @param lazyTimeIndex The timestamp index
+ * @param txnIndex The transaction index
+ * @param baseOffset A lower bound on the offsets in this segment
+ * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+ * 

Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-13 Thread via GitHub


rbaddam commented on PR #14491:
URL: https://github.com/apache/kafka/pull/14491#issuecomment-1761825493

   Also we have implemented the SPIFFE Integration in our echo system and 
[these changes](https://github.com/apache/kafka/pull/14491) will help to make 
the Spiffe Integration smooth.
   
   There an Open KIP on the Spiffe Integration 
[KIP-880](https://cwiki.apache.org/confluence/display/KAFKA/KIP-880%3A+X509+SAN+based+SPIFFE+URI+ACL+within+mTLS+Client+Certificates)
   
   If needed I am willing to Contribute to the above KIP-880 as well.
   
   cc: @omkreddy / @satishd / @mimaison 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15604) Add Telemetry RPCs Definitions

2023-10-13 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal reassigned KAFKA-15604:
-

Assignee: Apoorv Mittal

> Add Telemetry RPCs Definitions
> --
>
> Key: KAFKA-15604
> URL: https://issues.apache.org/jira/browse/KAFKA-15604
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> The goal of this task is to define the Telemetry RPCs as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-KafkaProtocolChanges];
>  
>  
> This requires the following steps:
>  # The request/response schemas must be defined (json schema)
>  # Request/response classes must be defined



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15601) Client metrics and observability

2023-10-13 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal reassigned KAFKA-15601:
-

Assignee: Apoorv Mittal

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer , streams
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14482) Move LogLoader to storage module

2023-10-13 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774988#comment-17774988
 ] 

Ismael Juma commented on KAFKA-14482:
-

No worries, thanks for the quick response. :)

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14482) Move LogLoader to storage module

2023-10-13 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-14482:
---

Assignee: Ismael Juma

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14482) Move LogLoader to storage module

2023-10-13 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14482:

Fix Version/s: 3.7.0

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15338: The metric group documentation for metrics added in KAFK… [kafka]

2023-10-13 Thread via GitHub


mjsax commented on PR #14221:
URL: https://github.com/apache/kafka/pull/14221#issuecomment-1761809518

   No problem :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14481: Move LogSegment/LogSegments to storage module [kafka]

2023-10-13 Thread via GitHub


ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1358306908


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -0,0 +1,887 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+private static final Timer LOG_FLUSH_TIMER;
+
+static {
+KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+@Override
+public MetricName metricName(String name, Map 
tags) {
+// Override the group and type names for compatibility - this 
metrics group was previously defined within
+// a Scala object named `kafka.log.LogFlushStats`
+return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+}
+};
+LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);

Review Comment:
   I'm not sure what to add as a comment, static fields basically last for the 
life of the JVM. I would personally prefer to have this metric be scoped to the 
life of a kafka server, but that would be a behavior change and best handled 
via a separate JIRA/PR. I simply preserved the behavior here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15604) Add Telemetry RPCs Definitions

2023-10-13 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15604:
-

 Summary: Add Telemetry RPCs Definitions
 Key: KAFKA-15604
 URL: https://issues.apache.org/jira/browse/KAFKA-15604
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal


The goal of this task is to define the Telemetry RPCs as described 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-KafkaProtocolChanges];
 

 

This requires the following steps:
 # The request/response schemas must be defined (json schema)
 # Request/response classes must be defined



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358490055


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   we shouldn't be passing through the interbroker listener name, we should be 
using the listener used by the original request to be consistent with the 
metadata request.
   
   Is it simpler if we just consult the metadata cache? In KRaft mode, the 
metadata cache is the source of truth for partition leadership and is updated 
before the partition state gets updated.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   produce requests should never receive FENCED_LEADER_EPOCH.
   
   also, shouldn't this go in the above `if` block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >