[GitHub] [kafka] philipnee commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-08 Thread via GitHub


philipnee commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1539356020

   Thank you @vvcephei - https://issues.apache.org/jira/browse/KAFKA-14977 is 
filed  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14977) testDescribeStateOfExistingGroupWithRoundRobinAssignor is flaky

2023-05-08 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14977:
--

 Summary: testDescribeStateOfExistingGroupWithRoundRobinAssignor is 
flaky
 Key: KAFKA-14977
 URL: https://issues.apache.org/jira/browse/KAFKA-14977
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Philip Nee
 Attachments: failed_test.log

Relevent ticket: 
- KAFKA-8110 Flaky Test 
DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions
- KAFKA-7969 Flaky Test 
DescribeConsumerGroupTest#testDescribeOffsetsOfExistingGroupWithNoMembers
- KAFKA-8068 Flaky Test 
DescribeConsumerGroupTest#testDescribeMembersOfExistingGroup
- KAFKA-8706 Kafka 2.3.0 Transient Unit Test Failures on Oracle Linux - See 
attached for details
 
See the attachment for failure



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-8170) To add kafka data at rest encryption

2023-05-08 Thread kaiyangzhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720759#comment-17720759
 ] 

kaiyangzhang edited comment on KAFKA-8170 at 5/9/23 3:28 AM:
-

[~sliebau] Hello, do you have any new progress in[ KIP-317], or do you have any 
information for reference? We have data encryption requirements here. Do you 
have any recommended solutions?


was (Author: kaiyangzhang):
[~sliebau] Hello, do you have any new progress in[ KIP-317], or do you have any 
information for reference?

> To add kafka data at rest encryption
> 
>
> Key: KAFKA-8170
> URL: https://issues.apache.org/jira/browse/KAFKA-8170
> Project: Kafka
>  Issue Type: New Feature
>  Components: log
>Reporter: Akash
>Priority: Minor
>  Labels: features, security
>
> Kafka have mechanism for wire encryption of data.
> But the kafka data at rest which exist in /- 
> is still unencrypted.
> This directories now have log files with actual messages embedded metadata, 
> but unauthorised user can still recover messages from this files
> Addiding encryption for this data would be valuable for preventing message 
> protection from disk theft, unauthorised user access on servers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-8170) To add kafka data at rest encryption

2023-05-08 Thread kaiyangzhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720759#comment-17720759
 ] 

kaiyangzhang commented on KAFKA-8170:
-

[~sliebau] Hello, do you have any new progress in[ KIP-317], or do you have any 
information for reference?

> To add kafka data at rest encryption
> 
>
> Key: KAFKA-8170
> URL: https://issues.apache.org/jira/browse/KAFKA-8170
> Project: Kafka
>  Issue Type: New Feature
>  Components: log
>Reporter: Akash
>Priority: Minor
>  Labels: features, security
>
> Kafka have mechanism for wire encryption of data.
> But the kafka data at rest which exist in /- 
> is still unencrypted.
> This directories now have log files with actual messages embedded metadata, 
> but unauthorised user can still recover messages from this files
> Addiding encryption for this data would be valuable for preventing message 
> protection from disk theft, unauthorised user access on servers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13684: MINOR: fix compilation failure

2023-05-08 Thread via GitHub


showuon merged PR #13684:
URL: https://github.com/apache/kafka/pull/13684


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-08 Thread via GitHub


showuon commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1188050289


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,29 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException

Review Comment:
   This comment is not clear enough (ex: what is maximum offset mean in this 
test? what index mean here?). Maybe:
   ```
   LogSegmentOffsetOverflowException should be thrown while appending the logs 
if:
   1. largestOffset - baseOffset < 0
   2. largestOffset - baseOffset > Integer.MAX_VALUE
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-08 Thread via GitHub


showuon commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1188050289


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,29 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException

Review Comment:
   This comment is not clear enough (ex: what is maximum offset mean in this 
test?). Maybe:
   ```
   LogSegmentOffsetOverflowException should be thrown while appending the logs 
if:
   1. largestOffset - baseOffset < 0
   2. largestOffset - baseOffset > Integer.MAX_VALUE
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread via GitHub


yashmayya commented on PR #13688:
URL: https://github.com/apache/kafka/pull/13688#issuecomment-1539275733

   @rhauch looks like there are 4 test failures - none of them are new failures 
or related to this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   Sure, that check was missed while pulling the changes. Good catch. Updated 
it with the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   Sure, that check was missed. Good catch. Updated it with the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-08 Thread via GitHub


jeffkbkim commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1187976451


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-08 Thread via GitHub


mjsax commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1180626928


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Seems we should not mess with the control flow here. Me might end up with 
spagetti-code. I would advocate to the move the commit logic into the restore 
code path if possible as mentioned further above -- this way, the existing 
control flow is not changed and we avoid all these compilations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-08 Thread via GitHub


mjsax commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1188018045


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   Ok. Not sure how important it is for users to see by default, but I agree it 
should not be too frequent as it should only happen during a rebalance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-08 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188009762


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new HashMap<>();
+// Initial Subscriptions are: A -> T1, T3 | B -> T1, T3
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap())
+);
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+mkAssignment(expectedAssignment, topic1Uuid, Arrays.asList(0, 1));
+mkAssignment(expectedAssignment, topic1Uuid, Collections.singleton(2));
+// Topic 3 Partitions Assignment
+mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(0));
+mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(1));
+
+assertAssignment(expectedAssignment, computedAssignment);
+}
+
+@Test
+public void 
testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
+Map topics = new 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-08 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188009097


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap())
+);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


mehbey commented on PR #13681:
URL: https://github.com/apache/kafka/pull/13681#issuecomment-1539231000

   Verified that failing Testing are not related to this change
   ```
   [2023-05-08T22:22:09.340Z] 1: Task failed with an exception.
   
   [2023-05-08T22:22:09.340Z] ---
   
   [2023-05-08T22:22:09.340Z] * What went wrong:
   
   [2023-05-08T22:22:09.340Z] Execution failed for task 
':streams:upgrade-system-tests-0102:integrationTest'.
   
   [2023-05-08T22:22:09.340Z] > Process 'Gradle Test Executor 151' finished 
with non-zero exit value 1
   
   [2023-05-08T22:22:09.340Z]   This problem might be caused by incorrect test 
process configuration.
   
   [2023-05-08T22:22:09.340Z]   Please refer to the test execution section in 
the User Manual at 
https://docs.gradle.org/8.1.1/userguide/java_testing.html#sec:test_execution
   
   [2023-05-08T22:22:09.340Z] 
   
   [2023-05-08T22:22:09.340Z] * Try:
   
   [2023-05-08T22:22:09.340Z] > Run with --stacktrace option to get the stack 
trace.
   
   [2023-05-08T22:22:09.340Z] > Run with --info or --debug option to get more 
log output.
   
   [2023-05-08T22:22:09.340Z] > Run with --scan to get full insights.
   
   [2023-05-08T22:22:09.340Z] 
==
   
   [2023-05-08T22:22:09.340Z] 
   
   [2023-05-08T22:22:09.340Z] 2: Task failed with an exception.
   
   [2023-05-08T22:22:09.340Z] ---
   
   [2023-05-08T22:22:09.340Z] * What went wrong:
   
   [2023-05-08T22:22:09.340Z] Execution failed for task 
':connect:mirror:integrationTest'.
   
   [2023-05-08T22:22:09.340Z] > Process 'Gradle Test Executor 130' finished 
with non-zero exit value 137
   
   [2023-05-08T22:22:09.340Z]   This problem might be caused by incorrect test 
process configuration.
   
   [2023-05-08T22:22:09.340Z]   Please refer to the test execution section in 
the User Manual at 
https://docs.gradle.org/8.1.1/userguide/java_testing.html#sec:test_execution
   
   [2023-05-08T22:22:09.340Z] 
   
   [2023-05-08T22:22:09.340Z] * Try:
   
   [2023-05-08T22:22:09.340Z] > Run with --stacktrace option to get the stack 
trace.
   
   [2023-05-08T22:22:09.340Z] > Run with --info or --debug option to get more 
log output.
   
   [2023-05-08T22:22:09.340Z] > Run with --scan to get full insights.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-05-08 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720717#comment-17720717
 ] 

Jun Rao edited comment on KAFKA-14561 at 5/9/23 12:24 AM:
--

The PR was reverted in the 3.5 branch. Updated the fix version.


was (Author: junrao):
The PR was reverted in 3.5 branch. Updated the fix version.

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.6.0
>
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-05-08 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-14561:

Fix Version/s: 3.6.0
   (was: 3.5.0)

The PR was reverted in 3.5 branch. Updated the fix version.

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.6.0
>
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


junrao commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187994355


##
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._
+
+class DelayedRemoteFetchTest {
+  private val maxBytes = 1024
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic")
+  private val fetchOffset = 500L
+  private val logStartOffset = 0L
+  private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val replicaId = 1
+
+  private val fetchStatus = FetchPartitionStatus(
+startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+  private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 
500)
+
+  @Test
+  def testFetchWithFencedEpoch(): Unit = {

Review Comment:
   Hmm, where is logic to simulate a fenced epoch in this test?



##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {
+RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
+Records records = mock(Records.class);
+
+

Review Comment:
   extra new line.



##
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the 

[GitHub] [kafka] satishd commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


satishd commented on PR #13535:
URL: https://github.com/apache/kafka/pull/13535#issuecomment-1539215636

   Thanks @junrao ,  updated the 
[comment](https://github.com/apache/kafka/pull/13535#discussion_r1184274133) 
with the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   Sure, updated it with the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-08 Thread via GitHub


vvcephei commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1539151873

   Thanks, @philipnee !
   
   I agree it's probably flaky, but I don't see a ticket filed for it. We 
haven't been very strict recently about this, but I'm reluctant to just forge 
ahead at this point, given all the mailing list discussions about recent merges 
breaking the build because they didn't block on failing tests.
   
   Do you mind filing a ticket for this test, and then we can merge it?
   
   (see 
https://issues.apache.org/jira/browse/KAFKA-8706?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20text%20~%20%22DescribeConsumerGroupTest%22
 )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test

2023-05-08 Thread via GitHub


mjsax commented on PR #13654:
URL: https://github.com/apache/kafka/pull/13654#issuecomment-1539079411

   Merged to `trunk` and cherry-picked to `3.5` branch. \cc @mimaison 
   
   Seems some other streams system tests are still flaky (well, I hope it's not 
a real issue). I hope to find time to dig into it this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13654: HOTFIX: fix broken Streams upgrade system test

2023-05-08 Thread via GitHub


mjsax merged PR #13654:
URL: https://github.com/apache/kafka/pull/13654


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


junrao commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187906662


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1187901148


##
shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java:
##
@@ -93,32 +96,33 @@ public String toString() {
 }
 
 @Override
-public void accept(MetadataNodeManager.Data data) {
+public void accept(MetadataShellState state) {
 String fullGlob = glob.startsWith("/") ? glob :
-data.workingDirectory() + "/" + glob;
+state.workingDirectory() + "/" + glob;
 List globComponents =
 
CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob));
-if (!accept(globComponents, 0, data.root(), new String[0])) {
+if (!accept(globComponents, 0, state.root(), new String[0])) {
 handler.accept(Optional.empty());
 }
 }
 
-private boolean accept(List globComponents,
-   int componentIndex,
-   MetadataNode node,
-   String[] path) {
+private boolean accept(
+List globComponents,
+int componentIndex,
+MetadataNode node,
+String[] path
+) {

Review Comment:
   Like above, I'll add a check and helpful exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1187899780


##
shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java:
##
@@ -80,28 +84,28 @@ public FindCommandHandler(List paths) {
 }
 
 @Override
-public void run(Optional shell,
-PrintWriter writer,
-MetadataNodeManager manager) throws Exception {
+public void run(
+Optional shell,
+PrintWriter writer,
+MetadataShellState state
+) throws Exception {
 for (String path : CommandUtils.getEffectivePaths(paths)) {
-manager.visit(new GlobVisitor(path, entryOption -> {
+new GlobVisitor(path, entryOption -> {
 if (entryOption.isPresent()) {
 find(writer, path, entryOption.get().node());
 } else {
 writer.println("find: " + path + ": no such file or 
directory.");
 }
-}));
+}).accept(state);
 }
 }
 
 private void find(PrintWriter writer, String path, MetadataNode node) {
 writer.println(path);
-if (node instanceof DirectoryNode) {
-DirectoryNode directory = (DirectoryNode) node;
-for (Entry entry : 
directory.children().entrySet()) {
-String nextPath = path.equals("/") ?
-path + entry.getKey() : path + "/" + entry.getKey();
-find(writer, nextPath, entry.getValue());
+if (node.isDirectory()) {

Review Comment:
   That would be a logic error. I will add a check and RuntimeException to make 
it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1187898816


##
shell/src/main/java/org/apache/kafka/shell/state/MetadataShellState.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.state;
+
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.node.RootShellNode;
+
+import java.util.function.Consumer;
+
+/**
+ * The Kafka metadata shell.

Review Comment:
   good point, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1187898447


##
metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+
+
+public interface MetadataNodeRedactionCriteria {
+/**
+ * Returns true if SCRAM data should be redacted.
+ */
+boolean shouldRedactScram();
+
+/**
+ * Returns true if a configuration should be redacted.
+ *
+ * @param type  The configuration type.
+ * @param key   The configuration key.
+ *
+ * @return  True if the configuration should be redacted.
+ */
+boolean shouldRedactConfig(ConfigResource.Type type, String key);
+
+class Strict implements MetadataNodeRedactionCriteria {
+public static final Strict INSTANCE = new Strict();
+
+@Override
+public boolean shouldRedactScram() {
+return true;
+}
+
+@Override
+public boolean shouldRedactConfig(ConfigResource.Type type, String 
key) {
+return true;
+}
+}
+
+class Normal implements MetadataNodeRedactionCriteria {

Review Comment:
   right, this one redacts only what needs to be redacted. annoyingly, we can't 
use it for toString() since that isn't called in a context that has access to 
it. So we'll just have to be strict with toString



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1187897908


##
shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.node.printer;
+
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria;
+
+import java.io.PrintWriter;
+
+
+/**
+ * The Kafka metadata shell.
+ */
+public class ShellNodePrinter implements MetadataNodePrinter {
+private final PrintWriter writer;
+private int indentationLevel;
+
+public ShellNodePrinter(PrintWriter writer) {
+this.writer = writer;
+}
+
+String indentationString() {
+StringBuilder bld = new StringBuilder();
+for (int i = 0; i < indentationLevel; i++) {
+for (int j = 0; j < 2; j++) {
+bld.append(" ");
+}
+}
+return bld.toString();
+}
+
+@Override
+public MetadataNodeRedactionCriteria redactionCriteria() {
+return MetadataNodeRedactionCriteria.Disabled.INSTANCE;

Review Comment:
   yes, I think that makes sense for the shell since it is targetted at 
operators after all.
   
   the main concern in other cases is the controller / broker daemon sending 
stuff to log4j that shouldn't be there. very different than the shell use case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13682: MINOR: improved exception/warn logging for stream-stream join store settings

2023-05-08 Thread via GitHub


mjsax commented on code in PR #13682:
URL: https://github.com/apache/kafka/pull/13682#discussion_r1187897510


##
streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java:
##
@@ -115,7 +115,7 @@ public boolean persistent() {
 
 @Override
 public boolean isOpen() {
-return inner.persistent();
+return inner.isOpen();

Review Comment:
   Can we extract this and cherry-pick to `3.5` before the release goes out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers

2023-05-08 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720672#comment-17720672
 ] 

Colin McCabe commented on KAFKA-14918:
--

This is fixed, sorry for the delay in resolving.

> KRaft controller sending ZK controller RPCs to KRaft brokers
> 
>
> Key: KAFKA-14918
> URL: https://issues.apache.org/jira/browse/KAFKA-14918
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> During the migration, when upgrading a ZK broker to KRaft, the controller is 
> incorrectly sending UpdateMetadata requests to the KRaft controller. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers

2023-05-08 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-14918.
--
Resolution: Fixed

> KRaft controller sending ZK controller RPCs to KRaft brokers
> 
>
> Key: KAFKA-14918
> URL: https://issues.apache.org/jira/browse/KAFKA-14918
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> During the migration, when upgrading a ZK broker to KRaft, the controller is 
> incorrectly sending UpdateMetadata requests to the KRaft controller. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-05-08 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-14698.
--
Fix Version/s: (was: 3.4.1)
   Resolution: Duplicate

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Assignee: Akhilesh Chaganti
>Priority: Major
> Fix For: 3.5.0
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


mumrah commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1187879810


##
shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.node.printer;
+
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria;
+
+import java.io.PrintWriter;
+
+
+/**
+ * The Kafka metadata shell.
+ */
+public class ShellNodePrinter implements MetadataNodePrinter {
+private final PrintWriter writer;
+private int indentationLevel;
+
+public ShellNodePrinter(PrintWriter writer) {
+this.writer = writer;
+}
+
+String indentationString() {
+StringBuilder bld = new StringBuilder();
+for (int i = 0; i < indentationLevel; i++) {
+for (int j = 0; j < 2; j++) {
+bld.append(" ");
+}
+}
+return bld.toString();
+}
+
+@Override
+public MetadataNodeRedactionCriteria redactionCriteria() {
+return MetadataNodeRedactionCriteria.Disabled.INSTANCE;

Review Comment:
   Ok, so while using the shell, we won't redact anything? Is the this true for 
both interactive and non-interactive usages?



##
metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+
+
+public interface MetadataNodeRedactionCriteria {
+/**
+ * Returns true if SCRAM data should be redacted.
+ */
+boolean shouldRedactScram();
+
+/**
+ * Returns true if a configuration should be redacted.
+ *
+ * @param type  The configuration type.
+ * @param key   The configuration key.
+ *
+ * @return  True if the configuration should be redacted.
+ */
+boolean shouldRedactConfig(ConfigResource.Type type, String key);
+
+class Strict implements MetadataNodeRedactionCriteria {
+public static final Strict INSTANCE = new Strict();
+
+@Override
+public boolean shouldRedactScram() {
+return true;
+}
+
+@Override
+public boolean shouldRedactConfig(ConfigResource.Type type, String 
key) {
+return true;
+}
+}
+
+class Normal implements MetadataNodeRedactionCriteria {

Review Comment:
   Since this one requires KafkaConfigSchema, I'm guessing it's for use by 
QuorumController?



##
shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java:
##
@@ -80,28 +84,28 @@ public FindCommandHandler(List paths) {
 }
 
 @Override
-public void run(Optional shell,
-PrintWriter writer,
-MetadataNodeManager manager) throws Exception {
+public void run(
+Optional shell,
+PrintWriter writer,
+MetadataShellState state
+) throws Exception {
 for (String path : CommandUtils.getEffectivePaths(paths)) {
-manager.visit(new GlobVisitor(path, entryOption -> {
+new GlobVisitor(path, entryOption -> {
 if (entryOption.isPresent()) {
 

[jira] [Created] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-05-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14976:
--

 Summary: Left/outer stream-stream joins create KV stores that 
aren't customizable
 Key: KAFKA-14976
 URL: https://issues.apache.org/jira/browse/KAFKA-14976
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


It appears that we only give the illusion of full customizability when it comes 
to the state stores of a windowed join. This arose due to an 
[optimization|https://github.com/apache/kafka/pull/11252] for the performance 
of the spurious results fix, and means that these joins now come with one 
additional, and possibly unexpected, state store:

 
{code:java}
final StoreBuilder, 
LeftOrRightValue>> builder =
            new ListValueStoreBuilder<>(
         |--[   persistent ? 
this-->  |         Stores.persistentKeyValueStore(storeName) : 
         |--[      Stores.inMemoryKeyValueStore(storeName),
                timestampedKeyAndJoinSideSerde,
                leftOrRightValueSerde,
                Time.SYSTEM
            ); {code}
 

where persistent is defined above that as
{code:java}
final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || 
streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
 
This means regardless of whether a custom state store implementation was passed 
in to the join, we will still insert one of our RocksDB or InMemory state 
stores. Which might be very surprising since the API makes it seem like the 
underlying stores are fully configurable.

I'm adding a warning line for this in PR 
[#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
 but we should really make this hidden state store fully configurable like the 
window stores currently are (which will require a KIP)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-05-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14976:
---
Labels: needs-kip  (was: )

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mehbey commented on pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


mehbey commented on PR #13681:
URL: https://github.com/apache/kafka/pull/13681#issuecomment-1538991259

   Addressed Divij's comment and re-based with the latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vepo commented on pull request #12809: KAFKA-14324: Upgrade RocksDB to 7.1.2

2023-05-08 Thread via GitHub


vepo commented on PR #12809:
URL: https://github.com/apache/kafka/pull/12809#issuecomment-1538926727

   Is there any fix for this memory leak? How can we reproduce this memory leak 
for testing?
   
   Some of our environment is facing a memory leak but we cannot reproduce it, 
as we use **_org.rocksdb:rocksdbjni_** 7.1.2, I suppose this is the root cause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-08 Thread via GitHub


dajac merged PR #13638:
URL: https://github.com/apache/kafka/pull/13638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Scanteianu commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-05-08 Thread via GitHub


Scanteianu commented on PR #13455:
URL: https://github.com/apache/kafka/pull/13455#issuecomment-1538836327

   > Thanks @vvcephei - Think this is the failure. ZK related so I think the PR 
is good to go.
   > 
   > `Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest 1m 51s `
   
   Thanks @vvcephei  @philipnee! The ci failures for the various builds have 
all seemed independent of this change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing

2023-05-08 Thread via GitHub


mdedetrich commented on code in PR #13689:
URL: https://github.com/apache/kafka/pull/13689#discussion_r1187745268


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -76,6 +78,11 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
 private final boolean startConsumerThread;
 
 private Thread initializationThread;
+
+private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;
+
+private CountDownLatch initializeLatch;

Review Comment:
   Thanks for the tip! Definitely can do this, I was just mimicking the style 
in the rest of the Kafka codebase. Will see what the reviewers respond with.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-08 Thread via GitHub


jolshan commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1538787228

   Thanks @hachikuji! Working on the tests as well, so I will push those when 
they are ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-08 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187719600


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-val entry = producerStateManager.activeProducers.get(producerId)
-entry != null && entry.currentTxnFirstOffset.isPresent
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, 
baseSequence: Int): Boolean = lock synchronized {
+val entry = producerStateManager.entryForVerification(producerId, 
producerEpoch, baseSequence)
+(!entry.currentTxnFirstOffset.isPresent) &&
+  (entry.compareAndSetVerificationState(producerEpoch, 
ProducerStateEntry.VerificationState.EMPTY, 
ProducerStateEntry.VerificationState.VERIFYING) ||
+entry.verificationState() == 
ProducerStateEntry.VerificationState.VERIFYING)
+  }
+  
+  def compareAndSetVerificationState(producerId: Long,
+ producerEpoch: Short,
+ baseSequence: Int,
+ expectedVerificationState: 
ProducerStateEntry.VerificationState,
+ newVerificationState: 
ProducerStateEntry.VerificationState): Unit = { lock synchronized {

Review Comment:
   That's the method braces. Should I just make a new line so it is clearer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue closed pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-05-08 Thread via GitHub


kirktrue closed pull request #13640: KAFKA-14937: Refactoring for client code 
to reduce boilerplate
URL: https://github.com/apache/kafka/pull/13640


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-05-08 Thread via GitHub


kirktrue commented on PR #13640:
URL: https://github.com/apache/kafka/pull/13640#issuecomment-1538766507

   Closing pull request and moving to a fork where we're doing integration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-08 Thread via GitHub


hachikuji commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187662801


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 return false;
 }
 }
+
+public void maybeUpdateTentaitiveSequence(int sequence) {

Review Comment:
   typo: Tentaitive



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 return false;
 }
 }
+
+public void maybeUpdateTentaitiveSequence(int sequence) {
+if (batchMetadata.isEmpty() && (!this.tentativeSequence.isPresent() || 
this.tentativeSequence.getAsInt() > sequence))
+this.tentativeSequence = OptionalInt.of(sequence);
+}
 
 private void addBatchMetadata(BatchMetadata batch) {
+// When appending a batch, we no longer need tentative sequence.
+this.tentativeSequence = OptionalInt.empty();
 if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) 
batchMetadata.removeFirst();
 batchMetadata.add(batch);
 }
+
+public boolean compareAndSetVerificationState(short expectedProducerEpoch, 
VerificationState expectedVerificationState, VerificationState 
newVerificationState) {
+if (expectedProducerEpoch == this.producerEpoch && verificationState 
== expectedVerificationState) {
+this.verificationState = newVerificationState;
+return true;
+}
+return false;
+}
 
 public void update(ProducerStateEntry nextEntry) {
-update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, 
nextEntry.lastTimestamp, nextEntry.batchMetadata, 
nextEntry.currentTxnFirstOffset);
+update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, 
nextEntry.lastTimestamp, nextEntry.batchMetadata, 
nextEntry.currentTxnFirstOffset, nextEntry.verificationState);
 }
 
 public void update(short producerEpoch, int coordinatorEpoch, long 
lastTimestamp) {
-update(producerEpoch, coordinatorEpoch, lastTimestamp, new 
ArrayDeque<>(0), OptionalLong.empty());
+update(producerEpoch, coordinatorEpoch, lastTimestamp, new 
ArrayDeque<>(0), OptionalLong.empty(), VerificationState.EMPTY);
 }
 
 private void update(short producerEpoch, int coordinatorEpoch, long 
lastTimestamp, Deque batchMetadata,
-OptionalLong currentTxnFirstOffset) {
+OptionalLong currentTxnFirstOffset, VerificationState 
verificationState) {

Review Comment:
   nit: when arg lists go above 2 or 3, it's helpful to start putting each 
argument on a separate line:
   ```java
   private void update(
 short producerEpoch,
 int coordinatorEpoch,
 long lastTimestamp,
 ...
   ) {
   



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
 private int coordinatorEpoch;
 private long lastTimestamp;
 private OptionalLong currentTxnFirstOffset;
+
+private VerificationState verificationState;
+
+// Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+private OptionalInt tentativeSequence;
+
+public enum VerificationState {
+EMPTY,
+VERIFYING,
+VERIFIED
+}
 
 public static ProducerStateEntry empty(long producerId) {
-return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty());
+return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty(), VerificationState.EMPTY, 
OptionalInt.empty());
+}
+
+public static ProducerStateEntry forVerification(long producerId, short 
producerEpoch, long milliseconds) {

Review Comment:
   nit: `milliseconds` -> `lastTimestamp`?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -183,6 +184,19 @@ private void clearProducerIds() {
 producers.clear();
 producerIdCount = 0;
 }
+
+public ProducerStateEntry entryForVerification(long producerId, short 
producerEpoch, int firstSequence) {
+ProducerStateEntry entry;
+if (producers.containsKey(producerId)) {

Review Comment:
   nit: usually we would call `get` and check for null (saves one hash lookup)



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {

[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-08 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1187640544


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The epoch of the member when the state was last updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:

Review Comment:
   makes sense thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-08 Thread via GitHub


philipnee commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1538666147

   Thanks @vvcephei - I think this is a flaky test?
   
   `Build / JDK 17 and Scala 2.13 / 
testDescribeStateOfExistingGroupWithRoundRobinAssignor() – 
kafka.admin.DescribeConsumerGroupTest
   2s
   Skipped - 123`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-05-08 Thread via GitHub


philipnee commented on PR #13455:
URL: https://github.com/apache/kafka/pull/13455#issuecomment-1538664591

   Thanks @vvcephei - Think this is the failure.  ZK related so I think the PR 
is good to go.
   
   `Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, 
MetadataVersion=3.4-IV0, Security=PLAINTEXT – 
kafka.zk.ZkMigrationIntegrationTest
   1m 51s
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-08 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1187624902


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The epoch of the member when the state was last updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:

Review Comment:
   Updated the comment. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread via GitHub


yashmayya commented on PR #13688:
URL: https://github.com/apache/kafka/pull/13688#issuecomment-1538658594

   Thanks for taking a look @rhauch and your understanding here is correct. We 
should try to backport this to `3.3`, `3.4` and `3.5` as well (before the 
`3.5.0` release ideally, if possible).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


mehbey commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1187611572


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2579,7 +2563,9 @@ public void shouldUpdateInputPartitionsAfterRebalance() {
 assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
 assertThat(task00.state(), is(Task.State.RUNNING));
 assertEquals(newPartitionsSet, task00.inputPartitions());
-verify(activeTaskCreator, consumer, changeLogReader);
+verify(consumer, changeLogReader);
+Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(taskId00Assignment));

Review Comment:
   yeah good recommendation, I will push a new version with the update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720566#comment-17720566
 ] 

Yash Mayya commented on KAFKA-14974:


Thanks [~rhauch], your understanding here is correct. We should backport [this 
fix|https://github.com/apache/kafka/pull/13688] to {{{}3.3{}}}, {{3.4}} and 
{{3.5}} as well (before the {{3.5.0}} release ideally, if possible).

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread via GitHub


yashmayya commented on PR #13688:
URL: https://github.com/apache/kafka/pull/13688#issuecomment-1538608185

   Yep, that's right. We should backport this to `3.3`, `3.4` and `3.5` as well 
(before the `3.5.0` release ideally, if possible).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] reta commented on a diff in pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing

2023-05-08 Thread via GitHub


reta commented on code in PR #13689:
URL: https://github.com/apache/kafka/pull/13689#discussion_r1187596223


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -76,6 +78,11 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
 private final boolean startConsumerThread;
 
 private Thread initializationThread;
+
+private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;
+
+private CountDownLatch initializeLatch;

Review Comment:
   Just a suggestion, but I think you could simplify tracking the 
initialization flow by folding  `initializeLatch` & `initialized` into a 
`CompletableFuture` (which could also handle the case when close is called 
while the initialization is still in progress).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720553#comment-17720553
 ] 

Randall Hauch commented on KAFKA-14974:
---

[~yash.mayya], just to clarify:
 # #12984 changed the signatures of the then-`send(...)` methods by adding a 
return, which breaks backward compatibility for this utility class.
 # Those changes were made on `trunk` prior to the `3.5` branch ([it's in the 
`3.5` 
history](https://github.com/apache/kafka/commits/3.5?after=f9730c11b7b48a37f527a363e0c6dced53fdbc69+314=3.5_name=refs%2Fheads%2F3.5)),
 and backported to the `3.4` and `3.3` branches
 # To restore backward compatibility, this PR renames those methods that return 
a `Future` as `sendWithReceipt(...)` and adds back the two `send(...)` methods 
that have the same signature as before

Is this correct?

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-08 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1187582387


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The epoch of the member when the state was last updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:

Review Comment:
   I see. I guess it is a little confusing we have this as an all caps state, 
but don't list in in MemberState. Just wondering if it would be better to 
include a comment about the transient state when we transition or leave it out 
altogether



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-05-08 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14455:
---
Fix Version/s: 3.5.0
   3.4.1
   3.3.3

> Kafka Connect create and update REST APIs should surface failures while 
> writing to the config topic
> ---
>
> Key: KAFKA-14455
> URL: https://issues.apache.org/jira/browse/KAFKA-14455
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` 
> REST APIs internally simply write a message to the Connect cluster's internal 
> config topic (which is then processed asynchronously by the herder). However, 
> no callback is passed to the producer's send method and there is no error 
> handling in place for producer send failures (see 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
>  / 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]).
> Consider one such case where the Connect worker's principal doesn't have a 
> WRITE ACL on the cluster's config topic. Now suppose the user submits a 
> connector's configs via one of the above two APIs. The producer send 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
>  / 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]
>  won't succeed (due to a TopicAuthorizationException) but the API responses 
> will be `201 Created` success responses anyway. This is a very poor UX 
> because the connector will actually never be created but the API response 
> indicated success. Furthermore, this failure would only be detectable if 
> TRACE logs are enabled (via [this 
> log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java]
>  making it near impossible for users to debug. Producer callbacks should be 
> used to surface write failures back to the user via the API response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14974:
---
Affects Version/s: 3.5.0
   3.4.1
   3.3.3

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-14974:
--
Component/s: KafkaConnect

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mdedetrich commented on a diff in pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing

2023-05-08 Thread via GitHub


mdedetrich commented on code in PR #13689:
URL: https://github.com/apache/kafka/pull/13689#discussion_r1187572164


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java:
##
@@ -104,27 +103,10 @@ public void 
onPartitionLeadershipChanges(Set leaderPartitions,
 log.debug("TopicBasedRemoteLogMetadataManager configs after adding 
overridden properties: {}", configs);
 
 topicBasedRemoteLogMetadataManager.configure(configs);
-try {
-waitUntilInitialized(60_000);
-} catch (TimeoutException e) {
-throw new KafkaException(e);
-}
-
 
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions,
 Collections.emptySet());
 }
 
 // Visible for testing.
-public void waitUntilInitialized(long waitTimeMs) throws TimeoutException {

Review Comment:
   This was removed because the entire point of this PR is to make explicit 
waiting redundant. The fact that this was only possible in a test because of 
package private is the underlying reason behind this change, in other words 
when running `TopicBasedRemoteLogMetadataManager` normally within a broker it 
was too easy to call methods before the`TopicBasedRemoteLogMetadataManager` was 
finished initializing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing

2023-05-08 Thread via GitHub


mdedetrich commented on PR #13689:
URL: https://github.com/apache/kafka/pull/13689#issuecomment-1538539482

   @satishd @junrao Pinging you because you done previous PR's regarding Tiered 
Storage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich opened a new pull request, #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing

2023-05-08 Thread via GitHub


mdedetrich opened a new pull request, #13689:
URL: https://github.com/apache/kafka/pull/13689

   Current implementation of 
TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed will 
immediately throw an exception if TopicBasedRemoteLogMetadataManager is not 
initialized. Rather we should instead try and wait for 
TopicBasedRemoteLogMetadataManager to initialize with a timeout as this is 
expected behaviour of users.
   
   The solution to this is to use a typical `CountDownLatch` as is done 
elsewhere within Kafka 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720546#comment-17720546
 ] 

Randall Hauch commented on KAFKA-14974:
---

Thanks for catching this and providing a fix. Indeed, we have tried to maintain 
backward compatibility for this class since it is super useful.

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread via GitHub


yashmayya commented on PR #13688:
URL: https://github.com/apache/kafka/pull/13688#issuecomment-1538509105

   @rhauch could you please take a look whenever you get a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread via GitHub


yashmayya opened a new pull request, #13688:
URL: https://github.com/apache/kafka/pull/13688

   From https://issues.apache.org/jira/browse/KAFKA-14974:
   
   > `KafkaBasedLog` is a widely used utility class that provides a generic 
implementation of a shared, compacted log of records in a Kafka topic. It isn't 
in Connect's public API, but has been used outside of Connect and we try to 
preserve backward compatibility whenever possible. 
https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
void `KafkaBasedLog::send` methods to return a `Future`. While this change is 
source compatible, it isn't binary compatible. We can restore backward 
compatibility simply by re-instating the older send methods, and renaming the 
new Future returning send methods.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete

2023-05-08 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich updated KAFKA-14975:
---
Description: 
In the current implementation of TopicBasedRemoteLogMetadataManager various 
methods internally call the 
ensureInitializedAndNotClosed to ensure that the 
TopicBasedRemoteLogMetadataManager is initialized. If 
TopicBasedRemoteLogMetadataManager is not initialized then an exception will be 
thrown.

This is not an ideal behaviour, rather than just throwing an exception we 
should instead try to wait until TopicBasedRemoteLogMetadataManager is 
initialised (with a timeout). This is what the expected behaviour from users 
should be and its also what other parts of Kafka that use plugin based systems 
(ergo kafka connect) do.

  was:In the current implementation of 


> Make TopicBasedRemoteLogMetadataManager methods wait for initialize to 
> complete
> ---
>
> Key: KAFKA-14975
> URL: https://issues.apache.org/jira/browse/KAFKA-14975
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Assignee: Matthew de Detrich
>Priority: Major
>
> In the current implementation of TopicBasedRemoteLogMetadataManager various 
> methods internally call the 
> ensureInitializedAndNotClosed to ensure that the 
> TopicBasedRemoteLogMetadataManager is initialized. If 
> TopicBasedRemoteLogMetadataManager is not initialized then an exception will 
> be thrown.
> This is not an ideal behaviour, rather than just throwing an exception we 
> should instead try to wait until TopicBasedRemoteLogMetadataManager is 
> initialised (with a timeout). This is what the expected behaviour from users 
> should be and its also what other parts of Kafka that use plugin based 
> systems (ergo kafka connect) do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete

2023-05-08 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich reassigned KAFKA-14975:
--

Assignee: Matthew de Detrich

> Make TopicBasedRemoteLogMetadataManager methods wait for initialize to 
> complete
> ---
>
> Key: KAFKA-14975
> URL: https://issues.apache.org/jira/browse/KAFKA-14975
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Assignee: Matthew de Detrich
>Priority: Major
>
> In the current implementation of 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete

2023-05-08 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich updated KAFKA-14975:
---
Description: In the current implementation of 

> Make TopicBasedRemoteLogMetadataManager methods wait for initialize to 
> complete
> ---
>
> Key: KAFKA-14975
> URL: https://issues.apache.org/jira/browse/KAFKA-14975
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In the current implementation of 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete

2023-05-08 Thread Matthew de Detrich (Jira)
Matthew de Detrich created KAFKA-14975:
--

 Summary: Make TopicBasedRemoteLogMetadataManager methods wait for 
initialize to complete
 Key: KAFKA-14975
 URL: https://issues.apache.org/jira/browse/KAFKA-14975
 Project: Kafka
  Issue Type: Task
Reporter: Matthew de Detrich






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-05-08 Thread via GitHub


viktorsomogyi commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1538474921

   Merged it, thank you @vamossagar12 for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi merged pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-05-08 Thread via GitHub


viktorsomogyi merged PR #13594:
URL: https://github.com/apache/kafka/pull/13594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14974:
--

 Summary: Restore backward compatibility in KafkaBasedLog
 Key: KAFKA-14974
 URL: https://issues.apache.org/jira/browse/KAFKA-14974
 Project: Kafka
  Issue Type: Task
Reporter: Yash Mayya
Assignee: Yash Mayya


{{KafkaBasedLog}} is a widely used utility class that provides a generic 
implementation of a shared, compacted log of records in a Kafka topic. It isn't 
in Connect's public API, but has been used outside of Connect and we try to 
preserve backward compatibility whenever possible. 
https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
change is source compatible, it isn't binary compatible. We can restore 
backward compatibility simply by re-instating the older send methods, and 
renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna merged pull request #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


cadonna merged PR #13621:
URL: https://github.com/apache/kafka/pull/13621


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-05-08 Thread via GitHub


vamossagar12 commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1538450687

   Thanks @viktorsomogyi . These are the ones we could find so far. We can 
create follow up tickets if needed for any other executors within connect which 
need to be closed this way and merge this. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-08 Thread via GitHub


dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1187462145


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance 
state,
+ * its rebalance callback will be kept in the 
metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, 
its sync callback
+ *is kept in metadata until the leader provides 
the group assignment
+ *and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+private static class MemberSummary {
+private final String memberId;
+private final Optional groupInstanceId;
+private final String clientId;
+private final String clientHost;
+private final byte[] metadata;
+private final byte[] assignment;
+
+public MemberSummary(String memberId,
+ Optional groupInstanceId,
+ String clientId,
+ String clientHost,
+ byte[] metadata,
+ byte[] assignment) {
+
+this.memberId = memberId;
+this.groupInstanceId = groupInstanceId;
+this.clientId = clientId;
+this.clientHost = clientHost;
+this.metadata = metadata;
+this.assignment = assignment;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Optional getGroupInstanceId() {
+return groupInstanceId;
+}
+
+public String clientId() {
+return clientId;
+}
+
+public String clientHost() {
+return clientHost;
+}
+
+public byte[] metadata() {
+return metadata;
+}
+
+public byte[] assignment() {
+return assignment;
+}
+
+}
+
+/**
+ * The member id.
+ */
+private final String memberId;
+
+/**
+ * The group instance id.
+ */
+private final Optional groupInstanceId;
+
+/**
+ * The client id.
+ */
+private final String clientId;

Review Comment:
   It is better to refactor here. It is confusing otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


satishd commented on PR #13535:
URL: https://github.com/apache/kafka/pull/13535#issuecomment-1538311434

   Thanks @junrao for the updated review comments. Addressed the comments with 
inline replies or with the latest commits. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-08 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187412666


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   Do you mean to say that we should not return immediately if 
`remoteFetchInfo` exists because that should be served otherwise remote fetches 
may starve as long as there is enough data immediately available to be sent? 
So, the condition becomes 
   
   ```
   if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || 
fetchInfos.isEmpty 
   || bytesReadable >= params.minBytes || errorReadingData || 
hasDivergingEpoch 
   || hasPreferredReadReplica))
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13684: MINOR: fix compilation failure

2023-05-08 Thread via GitHub


showuon commented on PR #13684:
URL: https://github.com/apache/kafka/pull/13684#issuecomment-1538178676

   It's fine sine it's for 3.4 branch, not for trunk branch. Thanks 
@divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14973) Inconsistent treatment of invalid config when creating or altering a topic

2023-05-08 Thread James Shaw (Jira)
James Shaw created KAFKA-14973:
--

 Summary: Inconsistent treatment of invalid config when creating or 
altering a topic
 Key: KAFKA-14973
 URL: https://issues.apache.org/jira/browse/KAFKA-14973
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 3.4.0
Reporter: James Shaw


{{createTopics}} throws {{InvalidConfigurationException}} on receiving an 
invalid config entry name or value.
{{incrementalAlterConfigs}} throws {{InvalidConfigurationException}} on 
receiving an invalid config entry name, but throws {{InvalidRequestException}} 
on receiving an invalid entry value.

The {{incrementalAlterConfigs}} javadoc mentions that 
{{InvalidRequestException}} is anticipated; the {{createTopics}} javadoc says 
nothing about exception types.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13684: MINOR: fix compilation failure

2023-05-08 Thread via GitHub


showuon commented on PR #13684:
URL: https://github.com/apache/kafka/pull/13684#issuecomment-1538085446

   @mimaison , call for review to fix the build in v3.4 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13684: MINOR: fix compilation failure

2023-05-08 Thread via GitHub


showuon commented on PR #13684:
URL: https://github.com/apache/kafka/pull/13684#issuecomment-1538084603

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.clients.consumer.internals.EagerConsumerCoordinatorTest.testPrepareJoinAndRejoinAfterFailedRebalance()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
   Build / JDK 8 and Scala 2.12 / 
kafka.security.authorizer.AuthorizerTest.testDeleteAclOnWildcardResource(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testAllowAllAccess(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplication()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorConfig
   Build / JDK 17 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testTopicAcl(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-CoReside, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Distributed, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Distributed, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


divijvaidya commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1187180990


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -291,16 +292,16 @@ public void 
shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
 final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
-expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
 
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-replay(activeTaskCreator, standbyTaskCreator);
+replay(standbyTaskCreator);

Review Comment:
   Mockito doesn't require to call `replay` (unlike EasyMock)
   
   (same comment for rest of the places in this file where we are using replay)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -291,16 +292,16 @@ public void 
shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
 final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
-expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
 
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-replay(activeTaskCreator, standbyTaskCreator);
+replay(standbyTaskCreator);
 
 taskManager.handleAssignment(
 Collections.emptyMap(),
 mkMap(mkEntry(activeTaskToRecycle.id(), 
activeTaskToRecycle.inputPartitions()))
 );
 
-verify(activeTaskCreator, standbyTaskCreator);
+verify(standbyTaskCreator);

Review Comment:
   We probably want to verify a method invocation here. If you use Mockito's 
verify() here instead of EasyMock, you might see a compilation error.
   
   (same comment for rest of the usage of verify(mock) in this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


divijvaidya commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1187177289


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##


Review Comment:
   we still seem to have easymock imports. Can you please remove them and 
replace with Mockito as well?
   
   ```
   import static org.easymock.EasyMock.anyObject;
   import static org.easymock.EasyMock.eq;
   import static org.easymock.EasyMock.expect;
   import static org.easymock.EasyMock.expectLastCall;
   import static org.easymock.EasyMock.replay;
   import static org.easymock.EasyMock.reset;
   import static org.easymock.EasyMock.resetToStrict;
   import static org.easymock.EasyMock.verify;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-08 Thread via GitHub


divijvaidya commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1187177289


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##


Review Comment:
   we still seem to have easymock imports. Can you please remove them and 
replace with Mockito as well?
   
   ```
   import static org.easymock.EasyMock.anyObject;
   import static org.easymock.EasyMock.eq;
   import static org.easymock.EasyMock.expect;
   import static org.easymock.EasyMock.expectLastCall;
   import static org.easymock.EasyMock.replay;
   import static org.easymock.EasyMock.reset;
   import static org.easymock.EasyMock.resetToStrict;
   import static org.easymock.EasyMock.verify;
   



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -291,16 +292,16 @@ public void 
shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
 final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
-expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
 
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-replay(activeTaskCreator, standbyTaskCreator);
+replay(standbyTaskCreator);

Review Comment:
   Mockito doesn't require to call `replay` (unlike EasyMock)
   
   (same comment for rest of the places in this file where we are using replay)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -291,16 +292,16 @@ public void 
shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
 final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
-expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
 
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-replay(activeTaskCreator, standbyTaskCreator);
+replay(standbyTaskCreator);
 
 taskManager.handleAssignment(
 Collections.emptyMap(),
 mkMap(mkEntry(activeTaskToRecycle.id(), 
activeTaskToRecycle.inputPartitions()))
 );
 
-verify(activeTaskCreator, standbyTaskCreator);
+verify(standbyTaskCreator);

Review Comment:
   We probably want to verify a method invocation here. If you use Mockito's 
verify() here instead of EasyMock, you might see a compilation error.
   
   (same comment for rest of the usage of verify(mock) in this PR)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2579,7 +2563,9 @@ public void shouldUpdateInputPartitionsAfterRebalance() {
 assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
 assertThat(task00.state(), is(Task.State.RUNNING));
 assertEquals(newPartitionsSet, task00.inputPartitions());
-verify(activeTaskCreator, consumer, changeLogReader);
+verify(consumer, changeLogReader);
+Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(taskId00Assignment));

Review Comment:
   Mockito has a mode called STRICT_STUBS which fails the test if a defined 
stub is not invoked. We can greatly simplify code using that annotation since 
we don't have to do both `when()` and `verify()`. Using `when()` would suffice 
since the test will fail if the stub is not used (or using verify() would 
suffice for cases with void return). We use STRICT_STUBS in a bunch of places 
in Kafka code such as 
[this](https://github.com/apache/kafka/blob/347238948b86882a47faee4a2916d1b01333d95f/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java#L60).
   
   Please consider using it in this file as it will greatly remove boilerplate 
code verification.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-08 Thread via GitHub


cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)

Review Comment:
   I see the check, but it does not guarantee that the instance that was not 
rolled had enough time to write records in the old format to partitions that 
will be read by the rolled instances. Also it does not guarantee that the 
records that might have been written by the not rolled instance to partitions 
that will be read by the rolled instances have not been already consumed by the 
not rolled instance itself before the rolled instances start processing. 
Similar is true for the subsequent rolls.
   Does this make sense or do I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-08 Thread via GitHub


cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)

Review Comment:
   I see the check, but it does not guarantee that the instance that was not 
rolled had enough time to write records in the old format to partitions that 
will be read by the rolled instances. Also it does not guarantee that the 
records that might have been written by the not rolled instance to partitions 
that will be read by the rolled instances has not been already consumed by the 
not rolled instance itself before the rolled instances start processing. 
Similar is true for the subsequent rolls.
   Does this make sense or do I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-08 Thread via GitHub


cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1187159604


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)

Review Comment:
   Ah, I misinterpreted the code. I thought, the whole list of from_versions is 
passed into the function. Now I see that it is just one version, obviously. My 
fault, sorry! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13687: MINOR: test

2023-05-08 Thread via GitHub


showuon opened a new pull request, #13687:
URL: https://github.com/apache/kafka/pull/13687

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-08 Thread via GitHub


cmccabe opened a new pull request, #13686:
URL: https://github.com/apache/kafka/pull/13686

   Metadata image classes such as MetadataImage, ClusterImage, FeaturesImage, 
and so forth contain numerous sub-images. This PR adds a structured way of 
traversing those sub-images. This is useful for the metadata shell, and also 
for implementing toString functions.
   
   In both cases, the previous solution was suboptimal. The metadata shell was 
previously implemented in an ad-hoc way by mutating text-based tree nodes when 
records were replayed. This was difficult to keep in sync with changes to the 
record types (for example, we forgot to do this for SCRAM). It was also pretty 
low-level, being done at a level below that of the image classes. For toString, 
it was difficult to keep the implementations consistent previously, and also 
support both redacted and non-redacted output.
   
   The metadata shell directory was getting crowded since we never had 
submodules for it. This PR creates glob/, command/, node/, and state/ 
directories to keep things better organized.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon closed pull request #13685: test

2023-05-08 Thread via GitHub


showuon closed pull request #13685: test
URL: https://github.com/apache/kafka/pull/13685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13685: test

2023-05-08 Thread via GitHub


showuon opened a new pull request, #13685:
URL: https://github.com/apache/kafka/pull/13685

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13684: MINOR: fix compilation failure

2023-05-08 Thread via GitHub


showuon opened a new pull request, #13684:
URL: https://github.com/apache/kafka/pull/13684

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon closed pull request #13683: MINOR: 3.3 test

2023-05-08 Thread via GitHub


showuon closed pull request #13683: MINOR: 3.3 test
URL: https://github.com/apache/kafka/pull/13683


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13683: MINOR: 3.3 test

2023-05-08 Thread via GitHub


showuon opened a new pull request, #13683:
URL: https://github.com/apache/kafka/pull/13683

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14662) ACL listings in documentation are out of date

2023-05-08 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14662.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> ACL listings in documentation are out of date
> -
>
> Key: KAFKA-14662
> URL: https://issues.apache.org/jira/browse/KAFKA-14662
> Project: Kafka
>  Issue Type: Bug
>  Components: core, docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> ACLs listed in 
> https://kafka.apache.org/documentation/#operations_resources_and_protocols 
> are out of date. They only cover API keys up to 47 (OffsetDelete) and don't 
> include DescribeClientQuotas, AlterClientQuotas, 
> DescribeUserScramCredentials, AlterUserScramCredentials, DescribeQuorum, 
> AlterPartition, UpdateFeatures, DescribeCluster, DescribeProducers, 
> UnregisterBroker, DescribeTransactions, ListTransactions, AllocateProducerIds.
> This is hard to keep up to date so we should consider whether this could be 
> generated automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-08 Thread via GitHub


showuon merged PR #13660:
URL: https://github.com/apache/kafka/pull/13660


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-08 Thread via GitHub


showuon commented on PR #13660:
URL: https://github.com/apache/kafka/pull/13660#issuecomment-1537831054

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-Combined, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, 
Security=PLAINTEXT
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Isolated, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, 
Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13672: MINOR: Print the cause of failure for PlaintextAdminIntegrationTest

2023-05-08 Thread via GitHub


showuon merged PR #13672:
URL: https://github.com/apache/kafka/pull/13672


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >