[GitHub] [kafka] emissionnebula commented on pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-17 Thread via GitHub


emissionnebula commented on PR #13437:
URL: https://github.com/apache/kafka/pull/13437#issuecomment-1512536862

   I have updated the PR following the conclusion of the discussion on 
https://github.com/apache/kafka/pull/13280 and have added a wrapper class with 
an interface for NavigableSet to use with Authorizer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #13557: KAFKA-14902: KafkaStatusBackingStore retries on a dedicated backgroun…

2023-04-17 Thread via GitHub


urbandan commented on PR #13557:
URL: https://github.com/apache/kafka/pull/13557#issuecomment-1512514338

   @viktorsomogyi I see 1 failure, but that doesn't seem related to the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools

2023-04-17 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713388#comment-17713388
 ] 

Sagar Rao commented on KAFKA-14586:
---

[~mjsax] , oh I didn't know there was already a KIP and a Jira filed for this. 
I think we should be able to close the older ones. cc [~fvaleri] 

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax closed pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax closed pull request #13592: KAFKA-14862: Outer stream-stream join does 
not output all results with multiple input partitions
URL: https://github.com/apache/kafka/pull/13592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13571: KAFKA-14907:Add the traffic metric of the partition dimension in BrokerTopicStats

2023-04-17 Thread via GitHub


hudeqi commented on PR #13571:
URL: https://github.com/apache/kafka/pull/13571#issuecomment-1512389741

   Does this PR need a KIP process? Anyone take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169376737


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {

Review Comment:
   Yes, that's the reason -- I did follow the existing naming pattern (which is 
very subtle though)
   - StreamStreamJoinIntegration, StreamTableJoinIntegration etc (without K 
using TTD)
   - KStreamKStreamJoin
   - Integration, KStreamKTableJoinIntegeration (with K, using EmbeddedKafka)
   
   I am happy to rename, but maybe we should do it across the board and not as 
side thing in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] efajunk commented on pull request #9678: KAFKA-10798; Ensure response is delayed for failed SASL authentication with connection close delay

2023-04-17 Thread via GitHub


efajunk commented on PR #9678:
URL: https://github.com/apache/kafka/pull/9678#issuecomment-1512360596

   Hello!
   I'm not sure that the reason is still here, but we have exactly SASL 
authentication and the delay "still" doesn't work.
   Our Kafka version is 3.3. We see in logs that it's definitely applied, but 
with no effect(
   ```
   2023-04-17 14:58:36,322 INFO KafkaConfig values: 
   ...
connection.failed.authentication.delay.ms = 1000
   ...
   ```
   Log shows nonstop authorization attempts. One per 10 milliseconds (log e.g. 
below). Errors rate for the same topic is kept at the 1-2k per minute.
   ```
   2023-04-17 15:02:45,266 INFO Principal = User:same_user is Denied Operation 
= Describe from host = x.x.x.x(same_ip) on resource = Topic:LITERAL:same_topic 
for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger) 
[handler-0]
   2023-04-17 15:02:45,268 INFO Principal = User:same_user is Denied Operation 
= Describe from host = x.x.x.x(same_ip) on resource = Topic:LITERAL:same_topic 
for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger) 
[handler-3]
   2023-04-17 15:02:45,268 INFO Principal = User:same_user is Denied Operation 
= Describe from host = x.x.x.x(same_ip) on resource = Topic:LITERAL:same_topic 
for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger) 
[handler-10]
   2023-04-17 15:02:45,269 INFO Principal = User:same_user is Denied Operation 
= Describe from host = x.x.x.x(same_ip) on resource = Topic:LITERAL:same_topic 
for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger) 
[handler-11]
   ```
   Maybe there is still another config to make the delay work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-04-17 Thread via GitHub


hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1169408565


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -315,33 +305,28 @@ public void testPause() throws Exception {
 
 taskFuture.get();
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyTaskGetTopic(10);
+verifyOffsetFlush(true);
+verifyTopicCreation(TOPIC);
+verify(statusListener).onPause(taskId);
+verify(statusListener).onShutdown(taskId);
+verify(sourceTask).stop();
+verify(offsetWriter).offset(PARTITION, OFFSET);
+verifyClose();
 }
 
 @Test
 public void testPollsInBackground() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = expectPolls(10);
 // In this test, we don't flush, so nothing goes any further than the 
offset writer

Review Comment:
   you're right, removed comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169381571


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169380664


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169380378


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169379949


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169379331


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169378795


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169376737


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {

Review Comment:
   Yes, that's the reason -- I did follow the existing naming pattern (which is 
very subtle though)
   - StreamStreamsIntegration, StreamTableIntegration etc (without K using TTD)
   - KStreamKStreamIntegration, KStreamKTableIntegeration (with K, using 
EmbeddedKafka)
   
   I am happy to rename, but maybe we should do it across the board and not as 
side thing in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-04-17 Thread via GitHub


philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1169363951


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -319,26 +327,52 @@ public Map 
committed(final Set
+ * If the timeout specified by {@code default.api.timeout.ms} expires
+ * {@link org.apache.kafka.common.errors.TimeoutException} is thrown.
+ *
+ * @param partitions The partition to check
+ * @param timeout The maximum time to block.
+ * @return The last committed offset and metadata or null if there was no 
prior commit
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+ * the timeout specified by {@code default.api.timeout.ms} 
expires.
+ */
 @Override
 public Map committed(final 
Set partitions,
 final Duration 
timeout) {
+maybeWakeup();
 maybeThrowInvalidGroupIdException();
+
 if (partitions.isEmpty()) {
 return new HashMap<>();
 }
 
 final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+activeFutures.add(event.future());
 eventHandler.add(event);
 try {
-return event.complete(Duration.ofMillis(100));
+
+return event.complete(timeout);
+} catch (ExecutionException e) {
+throw new KafkaException(e);
 } catch (InterruptedException e) {
 throw new InterruptException(e);
 } catch (TimeoutException e) {
 throw new org.apache.kafka.common.errors.TimeoutException(e);
-} catch (ExecutionException e) {
-// Execution exception is thrown here
-throw new KafkaException(e);
-} catch (Exception e) {
+} catch (WakeupException e) {
+this.activeFutures.remove(event.future());

Review Comment:
   same, we should probably remove the future upon any failure/completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-04-17 Thread via GitHub


philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1169362391


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -479,18 +530,42 @@ public void commitSync(Map offsets) {
 commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
 }
 
+/**
+ * Commit the user provided offsets, blocking until the commit completes 
or the timeout expires. If the future is
+ * interrupted by wakeup, the future will be completed with an
+ * {@link org.apache.kafka.common.errors.WakeupException}.
+ *
+ * @param offsets offsets to commit.
+ * @param timeout amount of time to block for the commit to complete.
+ *
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
+ * is too large or if the topic does not exist).
+ * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
specified by {@code default.api.timeout.ms} expires
+ *before successful completion of the offset commit
+ */
 @Override
 public void commitSync(Map offsets, 
Duration timeout) {
-CompletableFuture commitFuture = commit(offsets);
+maybeWakeup();
+final WakeupableFuture commitFuture = commit(offsets);
+activeFutures.add(commitFuture);
 try {
 commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-} catch (final TimeoutException e) {
-throw new org.apache.kafka.common.errors.TimeoutException(e);
-} catch (final InterruptedException e) {
-throw new InterruptException(e);
-} catch (final ExecutionException e) {
+} catch (ExecutionException e) {
 throw new KafkaException(e);
-} catch (final Exception e) {
+} catch (InterruptedException e) {
+throw new InterruptException(e);
+} catch (TimeoutException e) {
+throw new org.apache.kafka.common.errors.TimeoutException(e);
+} catch (WakeupException e) {
+this.shouldWakeup.set(false);
+this.activeFutures.remove(commitFuture);

Review Comment:
   this probably should be in the finally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


vcrfxia commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169351304


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {

Review Comment:
   I'm guessing the reason this is added as a new test file rather than being 
added into the existing StreamStreamJoinIntegrationTest.java is because the 
existing test uses the TopologyTestDriver, which doesn't support testing with 
multiple partitions -- is that true?
   
   If that's the case, should we name the new file 
`KStreamKStreamMultiPartitionIntegrationTest` or similar, in order to better 
distinguish the two?



##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.Str

[jira] [Created] (KAFKA-14917) Producer write while transaction is pending.

2023-04-17 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14917:
--

 Summary: Producer write while transaction is pending.
 Key: KAFKA-14917
 URL: https://issues.apache.org/jira/browse/KAFKA-14917
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan
Assignee: Justine Olshan


As discovered in KAFKA-14904, we seem to get into a state where we try to write 
to a partition while the ongoing state is still pending.

This is likely a bigger issue than the test and worth looking in to.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-04-17 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14916:
--

 Summary: Fix code that assumes transactional ID implies all 
records are transactional
 Key: KAFKA-14916
 URL: https://issues.apache.org/jira/browse/KAFKA-14916
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
all record batches were transactional and had the same producer ID.

This work with improve validation and fix the code that assumes all batches are 
transactional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1169327746


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##
@@ -113,9 +113,9 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
 throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
 + " : " + data.newValue + ") in ChangeSerializer, 
which is not allowed unless upgrading.");
 } else {
-final int capacity = UINT32_SIZE + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
+final int capacity = MAX_VARINT_LENGTH + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
 buf = ByteBuffer.allocate(capacity);
-ByteUtils.writeUnsignedInt(buf, newDataLength);
+ByteUtils.writeVarint(newDataLength, buf);
 buf.put(newData).put(oldData).put((byte) 2);

Review Comment:
   I would like to save second bytebuffer allocation -- we already have a 
`ByteBuffer` at hand, so copying twice seems to be wasteful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint

2023-04-17 Thread via GitHub


mjsax commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1169326446


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##
@@ -113,9 +113,9 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
 throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
 + " : " + data.newValue + ") in ChangeSerializer, 
which is not allowed unless upgrading.");
 } else {
-final int capacity = UINT32_SIZE + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
+final int capacity = MAX_VARINT_LENGTH + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
 buf = ByteBuffer.allocate(capacity);
-ByteUtils.writeUnsignedInt(buf, newDataLength);
+ByteUtils.writeVarint(newDataLength, buf);

Review Comment:
   Yeah, it's a tradeoff (as pointed out on the PR description) -- I currently 
tend to prefer the more complex code because writing into the repartition topic 
is on the "hot data flow" path and thus seems worth the optimization. But if 
others also fee it's not worth it, happy to change to just 4-byte int.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


mjsax opened a new pull request, #13592:
URL: https://github.com/apache/kafka/pull/13592

   Stream-stream outer join, uses a "shared time tracker" to track stream-time 
progress for left and right input in a single place. This time tracker is 
incorrectly shared across tasks.
   
   This PR introduces a supplier to create a "shared time tracker" object per 
task, to be shared between the left and right join processors.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-17 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1512145181

   @hachikuji @jolshan Can you take a look at this attempted fix for the 
transaction manager state corruption? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-17 Thread via GitHub


kirktrue opened a new pull request, #13591:
URL: https://github.com/apache/kafka/pull/13591

   Poison the transaction manager if we detect an illegal transition as part of 
an internal (i.e. non-user direct) action.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14862:
---

Assignee: Matthias J. Sax

> Outer stream-stream join does not output all results with multiple input 
> partitions
> ---
>
> Key: KAFKA-14862
> URL: https://issues.apache.org/jira/browse/KAFKA-14862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Matthias J. Sax
>Priority: Major
>
> If I execute the following Streams app once with two input topics each with 1 
> partition and then with input topics each with two partitions, I get 
> different results.
>   
> {code:java}
> final KStream leftSide = builder.stream(leftSideTopic);
> final KStream rightSide = builder.stream(rightSideTopic);
> final KStream leftAndRight = leftSide.outerJoin(
> rightSide,
> (leftValue, rightValue) ->
> (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + 
> rightValue,
> JoinWindows.ofTimeDifferenceAndGrace(
> Duration.ofSeconds(20), 
> Duration.ofSeconds(10)),
> StreamJoined.with(
> Serdes.String(), /* key */
> Serdes.String(), /* left value */
> Serdes.String()  /* right value */
> ));
> leftAndRight.print(Printed.toSysOut());
> {code}
> To reproduce, produce twice the following batch of records with an interval 
> greater than window + grace period (i.e. > 30 seconds) in between the two 
> batches:
> {code}
> (0, 0)
> (1, 1)
> (2, 2)
> (3, 3)
> (4, 4)
> (5, 5)
> (6, 6)
> (7, 7)
> (8, 8)
> (9, 9)
> {code}
> With input topics with 1 partition I get:
> {code}
> [KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
> {code}
> With input topics with 2 partitions I get:
> {code}
> [KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
> {code}
> I would expect to get the same set of records, maybe in a different order due 
> to the partitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino merged pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-17 Thread via GitHub


rondagostino merged PR #13280:
URL: https://github.com/apache/kafka/pull/13280


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-17 Thread via GitHub


rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1512118780

   Here are some interesting GC results, via `./jmh.sh --prof gc 
TopicsImageSingleRecord` showing linear GC per operation on the order of 
megabytes now being constant at under 2k.
   **Old Code**
   ```
   Benchmark   
(numReplicasPerBroker)  (partitionsPerTopic)  (replicationFactor)  
(totalTopicCount)  Mode  Cnt Score Error   Units
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103  12500   avgt5  
 1193832.055 ±   0.015B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103  25000   avgt5  
 2387008.169 ±   0.027B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103  5   avgt5  
 4773480.672 ±   0.570B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103 10   avgt5  
 9546305.345 ±   0.077B/op
   ```
   
   **New Code**
   ```
   Benchmark   
(numReplicasPerBroker)  (partitionsPerTopic)  (replicationFactor)  
(totalTopicCount)  Mode  Cnt Score Error   Units
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103  12500   avgt5  
1744.000 ±   0.001B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103  25000   avgt5  
1864.000 ±   0.001B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103  5   avgt5  
1984.000 ±   0.001B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm   1
103 10   avgt5  
1984.000 ±   0.001B/op
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13590: Scram kraft update user scram credential record

2023-04-17 Thread via GitHub


cmccabe opened a new pull request, #13590:
URL: https://github.com/apache/kafka/pull/13590

   get test run on https://github.com/apache/kafka/pull/13513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-17 Thread via GitHub


rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1169270141


##
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
 }
 
 public void write(ImageWriter writer, ImageWriterOptions options) {
-for (TopicImage topicImage : topicsById.values()) {
-topicImage.write(writer, options);
+for (Map.Entry entry : topicsById.entrySet()) {
+entry.getValue().write(writer, options);

Review Comment:
   Yeah, it was in the Paguro implementation where `values()` was deprecated.  
We've moved away from that, but I left the code change in that stopped using 
the `values()` collection.  This change doesn't harm anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14909) KRaft Controllers not setting ZkMigrationReady tagged field

2023-04-17 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14909:
-
Parent: KAFKA-14304
Issue Type: Sub-task  (was: Bug)

> KRaft Controllers not setting ZkMigrationReady tagged field
> ---
>
> Key: KAFKA-14909
> URL: https://issues.apache.org/jira/browse/KAFKA-14909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> When sending ApiVersionsResponse to other controllers, the KRaft controller 
> is not setting the ZkMigrationReady field. This means, we can't determine if 
> the full KRaft quorum has been properly configured for a migration before 
> triggering the migration.
> As a result, we could start the migration on controller A (which was properly 
> configured), then fail over to controller B (which was not properly 
> configured) and no longer be in dual-write mode.
> The fix is to properly set the ZkMigrationReady tagged field, and to make use 
> of it in KRaftMigrationDriver



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14796) Migrate ZK ACLs to KRaft

2023-04-17 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14796.
--
Resolution: Fixed

Removed the 3.4.1 fix version since we're probably not back-porting this.

> Migrate ZK ACLs to KRaft
> 
>
> Key: KAFKA-14796
> URL: https://issues.apache.org/jira/browse/KAFKA-14796
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14796) Migrate ZK ACLs to KRaft

2023-04-17 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14796:
-
Fix Version/s: (was: 3.4.1)

> Migrate ZK ACLs to KRaft
> 
>
> Key: KAFKA-14796
> URL: https://issues.apache.org/jira/browse/KAFKA-14796
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14805) KRaft Controller shouldn't allow metadata updates before migration starts

2023-04-17 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-14805:


Assignee: David Arthur

> KRaft Controller shouldn't allow metadata updates before migration starts
> -
>
> Key: KAFKA-14805
> URL: https://issues.apache.org/jira/browse/KAFKA-14805
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0, 3.4.1
>
>
> When starting a ZK to KRaft migration, the new KRaft quorum should not accept 
> external metadata updates from things like CreateTopics or 
> AllocateProducerIds. Having metadata present in the log prior to the 
> migration can lead to undefined state, which is not great.
> This is currently causing test failures on trunk because some producer is 
> allocating a producer ID between the time the KRaft quorum starts and the 
> migration starts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14304) ZooKeeper to KRaft Migration

2023-04-17 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713283#comment-17713283
 ] 

David Arthur commented on KAFKA-14304:
--

[~mimaison] sounds good. This issue encompasses feature work across many 
releases, so having a fix version is a bit confusing.

> ZooKeeper to KRaft Migration
> 
>
> Key: KAFKA-14304
> URL: https://issues.apache.org/jira/browse/KAFKA-14304
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> Top-level JIRA for 
> [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] k-wall commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-17 Thread via GitHub


k-wall commented on PR #13572:
URL: https://github.com/apache/kafka/pull/13572#issuecomment-1512058199

   @divijvaidya thanks for taking time to put together the detailed reply, I'll 
make the changes you are suggesting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread via GitHub


pprovenzano commented on code in PR #13513:
URL: https://github.com/apache/kafka/pull/13513#discussion_r1169182411


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -167,11 +167,14 @@ public enum MetadataVersion {
 // and updates to a handful of RPCs.
 IBP_3_4_IV0(8, "3.4", "IV0", true),
 
-// Support for tiered storage (KIP-405) and SCRAM
+// Support for tiered storage (KIP-405)
 IBP_3_5_IV0(9, "3.5", "IV0", false),
 
 // Adds replica epoch to Fetch request (KIP-903).
-IBP_3_5_IV1(10, "3.5", "IV1", false);
+IBP_3_5_IV1(10, "3.5", "IV1", false),
+
+// Support for SCRAM 
+IBP_3_5_IV2(11, "3.5", "IV2", false);

Review Comment:
   Fixed! Hopefully it will build and auto merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-17 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1169135808


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Hey @dajac  thanks for the review. I think in case of the member misses a 
generation, we want to make sure the owned partitions are revoked (due to 
generation reset).  Regardlessly, it should still rejoin with its current 
partitions and should continue to hold on to its partition if it is only 1 
generation behind. If it is 1+ generations behind, circle back to the beginning 
of my response, we want to make sure they are revoked because the partition 
might have already been reassigned.
   
   This makes me think that this will happen regularly in medium to large 
groups. -> I think this might not be as uncommon as what we think, especially 
with a large consumer group deployed to multiple pods, considering the pods can 
be staled before sending out syncGroup, while another consumer in a different 
pod tries to join the group.
   
   I hope i'm answering your questions there, I apologize if I misunderstood 
anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169133482


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   Why is the `requestInFlight` flag not sufficient to prevent this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #13589: MINOR: rename internal FK-join processor classes

2023-04-17 Thread via GitHub


mjsax opened a new pull request, #13589:
URL: https://github.com/apache/kafka/pull/13589

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13542: [MINOR] getPartitionInfo fix

2023-04-17 Thread via GitHub


cmccabe merged PR #13542:
URL: https://github.com/apache/kafka/pull/13542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-17 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169101298


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   yes, without a sense of "epoch" we can have cases where we send additional 
controller requests right after a block is set. causing us to move forward 
without fully exhausting the existing block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14894) MetadataLoader must call finishSnapshot after loading a snapshot

2023-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14894:
-
Affects Version/s: 3.4.0
 Priority: Critical  (was: Major)

> MetadataLoader must call finishSnapshot after loading a snapshot
> 
>
> Key: KAFKA-14894
> URL: https://issues.apache.org/jira/browse/KAFKA-14894
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Critical
> Fix For: 3.5.0, 3.4.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools

2023-04-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713233#comment-17713233
 ] 

Matthias J. Sax commented on KAFKA-14586:
-

[~mimaison] [~sagarrao] – I am just realizing that we did this as part of 3.5 – 
We actually had https://issues.apache.org/jira/browse/KAFKA-4327 that we did 
not do in the past, because we thought we should only do it in major release, 
as it seems to be a breaking change (we also had 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
 for it)

Seems you solve the issue about introducing a breaking change with some 
"redirection" according to KIP-906. So we can we close K4327 and the K-756?

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904

2023-04-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713230#comment-17713230
 ] 

Matthias J. Sax commented on KAFKA-14911:
-

[~fqpublic] – are you planning to pickup this ticket?

> Add system tests for rolling upgrade path of KIP-904
> 
>
> Key: KAFKA-14911
> URL: https://issues.apache.org/jira/browse/KAFKA-14911
> Project: Kafka
>  Issue Type: Test
>Reporter: Farooq Qaiser
>Priority: Major
> Fix For: 3.5.0
>
>
> As per [~mjsax] comment 
> [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752],
>  we should add a system test to test the rolling upgrade path for 
> [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new 
> serialization format for groupBy internal repartition topics and was 
> implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 
> There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf 
> `test_rolling_upgrade_with_2_bounces`) as a starting point.
> Might be best to do a similar thing as for FK-joins, and add a new test 
> variation. 
> The tricky thing about the test would be, to ensure that the repartition 
> topic is not empty when we do the bounce, so the test should be setup 
> accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169036716


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   I did not follow why we need this. It seems like it's being used to prevent 
concurrent sends?



##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (

[jira] [Updated] (KAFKA-14894) MetadataLoader must call finishSnapshot after loading a snapshot

2023-04-17 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14894:

Fix Version/s: 3.4.1

> MetadataLoader must call finishSnapshot after loading a snapshot
> 
>
> Key: KAFKA-14894
> URL: https://issues.apache.org/jira/browse/KAFKA-14894
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 opened a new pull request, #13588: [MINOR]: Fixing gradle build during compileScala and compileTestScala

2023-04-17 Thread via GitHub


vamossagar12 opened a new pull request, #13588:
URL: https://github.com/apache/kafka/pull/13588

   @satishd , I am getting a gradle build failure (using Intellij gradle 
plugin) during compileScala and compileTestScala because of what seems to be a 
non-ASCII character in RemoteLogManager.java and RemoteLogManagerTest.java. The 
errors that I get are :
   
   ```
   [Error] 
/Users/sagarrao/gitprojects/kafka/core/src/test/java/kafka/log/remote/RemoteLogManager.java:457:
  error: unmappable character for encoding ASCII
   ```
   
   and 
   
   ```
   [Error] 
/Users/sagarrao/gitprojects/kafka/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:205:
  error: unmappable character for encoding ASCII
   ```
   
   With these changes, the build seems to be working. Let me know if this PR 
makes sense. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-17 Thread via GitHub


jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1511739519

   Yes. I ran it 100 times locally and no failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-04-17 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713206#comment-17713206
 ] 

Colin McCabe commented on KAFKA-14084:
--

SCRAM is in 3.5. It is supported as of MetadataVersion 3.5-IV2. KAFKA-14859 is 
almost done. It needs to be in 3.5 as well. The other JIRAs are done.

We should do SCRAM migration to for 3.5, I think, given that it's a small 
amount of code. And it would be pretty confusing to explain how we could end up 
with SCRAM support but not migration.

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread via GitHub


cmccabe commented on code in PR #13513:
URL: https://github.com/apache/kafka/pull/13513#discussion_r1169018558


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -167,11 +167,14 @@ public enum MetadataVersion {
 // and updates to a handful of RPCs.
 IBP_3_4_IV0(8, "3.4", "IV0", true),
 
-// Support for tiered storage (KIP-405) and SCRAM
+// Support for tiered storage (KIP-405)
 IBP_3_5_IV0(9, "3.5", "IV0", false),
 
 // Adds replica epoch to Fetch request (KIP-903).
-IBP_3_5_IV1(10, "3.5", "IV1", false);
+IBP_3_5_IV1(10, "3.5", "IV1", false),
+
+// Support for SCRAM 
+IBP_3_5_IV2(11, "3.5", "IV2", false);

Review Comment:
   didMetadataChange needs to be `true` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread via GitHub


cmccabe commented on PR #13513:
URL: https://github.com/apache/kafka/pull/13513#issuecomment-1511734452

   Here's a Jenkins failure I have not seen before.
   
   ```
   671 [2023-04-11T13:07:59.254Z] FAILURE: Build failed with an exception.
   672 [2023-04-11T13:07:59.254Z]
   673 [2023-04-11T13:07:59.254Z] * What went wrong:
   674 [2023-04-11T13:07:59.254Z] A problem was found with the configuration of 
task ':rat' (type 'RatTask').
   675 [2023-04-11T13:07:59.254Z]   - Gradle detected a problem with the 
following location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13513'.
   676 [2023-04-11T13:07:59.254Z]
   677 [2023-04-11T13:07:59.254Z] Reason: Task ':rat' uses this output of 
task ':clients:processTestMessages' without declaring an explicit or implicit 
dependency. This c
   678 [2023-04-11T13:07:59.254Z]
   679 [2023-04-11T13:07:59.254Z] Possible solutions:
   680 [2023-04-11T13:07:59.254Z]   1. Declare task 
':clients:processTestMessages' as an input of ':rat'.
   681 [2023-04-11T13:07:59.254Z]   2. Declare an explicit dependency on 
':clients:processTestMessages' from ':rat' using Task#dependsOn.
   682 [2023-04-11T13:07:59.254Z]   3. Declare an explicit dependency on 
':clients:processTestMessages' from ':rat' using Task#mustRunAfter.
   683 [2023-04-11T13:07:59.254Z]
   684 [2023-04-11T13:07:59.254Z] Please refer to 
https://docs.gradle.org/8.0.2/userguide/validation_problems.html#implicit_dependency
 for more details about this problem
   685 [2023-04-11T13:07:59.254Z]
   686 [2023-04-11T13:07:59.254Z] * Try:
   687 [2023-04-11T13:07:59.254Z] > Run with --stacktrace option to get the 
stack trace.
   688 [2023-04-11T13:07:59.254Z] > Run with --info or --debug option to get 
more log output.
   689 [2023-04-11T13:07:59.254Z] > Run with --scan to get full insights.
   690 [2023-04-11T13:07:59.254Z]
   691 [2023-04-11T13:07:59.254Z] * Get more help at https://help.gradle.org
   692 [2023-04-11T13:07:59.254Z]
   693 [2023-04-11T13:07:59.254Z] Deprecated Gradle features were used in this 
build, making it incompatible with Gradle 9.0.
   694 [2023-04-11T13:07:59.254Z]
   695 [2023-04-11T13:07:59.254Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
   696 [2023-04-11T13:07:59.254Z]
   697 [2023-04-11T13:07:59.254Z] See 
https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings```
   
   Perhaps someone broke the build again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14775) Support SCRAM for broker to controller authentication

2023-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-14775.
--
Fix Version/s: 3.5.0
 Assignee: Colin McCabe  (was: Proven Provenzano)
   Resolution: Fixed

> Support SCRAM for broker to controller authentication
> -
>
> Key: KAFKA-14775
> URL: https://issues.apache.org/jira/browse/KAFKA-14775
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Proven Provenzano
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0
>
>
> We need to apply SCRAM changes to controller nodes.
> We need to handle DescribeUserScramCredentialsRequest in the controller nodes.
> As part of this update I will split out the code from 
> {{BrokerMetadataPublisher.scala}} for applying the SCRAM  into a separate 
> {{{}MetadataPublisher{}}}, as we did with {{DynamicConfigPublisher}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

2023-04-17 Thread via GitHub


dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168982888


##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##
@@ -135,4 +141,119 @@ class TransactionLogTest {
 assertEquals(Some(""), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+val txnPartitions = new TransactionLogValue.PartitionsSchema()
+  .setTopic("topic")
+  .setPartitionIds(java.util.Collections.singletonList(0))
+
+val txnLogValue = new TransactionLogValue()
+  .setProducerId(100)
+  .setProducerEpoch(50.toShort)
+  .setTransactionStatus(CompleteCommit.id)
+  .setTransactionStartTimestampMs(750L)
+  .setTransactionLastUpdateTimestampMs(1000L)
+  .setTransactionTimeoutMs(500)
+  
.setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+val deserialized = TransactionLog.readTxnRecordValue("transactionId", 
serialized).get
+
+assertEquals(100, deserialized.producerId)
+assertEquals(50, deserialized.producerEpoch)
+assertEquals(CompleteCommit, deserialized.state)
+assertEquals(750L, deserialized.txnStartTimestamp)
+assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+assertEquals(500, deserialized.txnTimeoutMs)
+
+val actualTxnPartitions = deserialized.topicPartitions
+assertEquals(1, actualTxnPartitions.size)
+assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+// Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+// additional tagged fields.
+val futurePartitionsSchema = new Schema(
+  new Field("topic", Type.COMPACT_STRING, ""),
+  new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+  TaggedFieldsSection.of(
+0, new Field("partition_foo", Type.STRING, ""),
+1, new Field("partition_foo", Type.INT32, "")
+  )
+)
+
+// create TransactionLogValue.PartitionsSchema with tagged fields

Review Comment:
   nit: `Create`.



##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##
@@ -135,4 +141,119 @@ class TransactionLogTest {
 assertEquals(Some(""), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+val txnPartitions = new TransactionLogValue.PartitionsSchema()
+  .setTopic("topic")
+  .setPartitionIds(java.util.Collections.singletonList(0))
+
+val txnLogValue = new TransactionLogValue()
+  .setProducerId(100)
+  .setProducerEpoch(50.toShort)
+  .setTransactionStatus(CompleteCommit.id)
+  .setTransactionStartTimestampMs(750L)
+  .setTransactionLastUpdateTimestampMs(1000L)
+  .setTransactionTimeoutMs(500)
+  
.setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+val deserialized = TransactionLog.readTxnRecordValue("transactionId", 
serialized).get
+
+assertEquals(100, deserialized.producerId)
+assertEquals(50, deserialized.producerEpoch)
+assertEquals(CompleteCommit, deserialized.state)
+assertEquals(750L, deserialized.txnStartTimestamp)
+assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+assertEquals(500, deserialized.txnTimeoutMs)
+
+val actualTxnPartitions = deserialized.topicPartitions
+assertEquals(1, actualTxnPartitions.size)
+assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+// Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+// additional tagged fields.
+val futurePartitionsSchema = new Schema(
+  new Field("topic", Type.COMPACT_STRING, ""),
+  new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+  TaggedFieldsSection.of(
+0, new Field("partition_foo", Type.STRING, ""),
+1, new Field("partition_foo", Type.INT32, "")
+  )
+)
+
+// create TransactionLogValue.PartitionsSchema with 

[GitHub] [kafka] yashmayya commented on pull request #13587: Document the new 'GET /connectors/{name}/offsets' REST API for Connect

2023-04-17 Thread via GitHub


yashmayya commented on PR #13587:
URL: https://github.com/apache/kafka/pull/13587#issuecomment-1511688529

   @mimaison @C0urante could you please take a look? Should I raise a separate 
PR targeting the `3.5` branch as well or can you backport it there once this is 
merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #13587: Document the new 'GET /connectors/{name}/offsets' REST API for Connect

2023-04-17 Thread via GitHub


yashmayya opened a new pull request, #13587:
URL: https://github.com/apache/kafka/pull/13587

   - Adds documentation for the new `GET /connectors/{name}/offsets` REST API 
for Connect (see 
[KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect))
   - https://github.com/apache/kafka/pull/13434
   
   https://user-images.githubusercontent.com/23502577/232546534-b42fa9e7-4051-453d-a860-65f3014698c3.png";>
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713158#comment-17713158
 ] 

Yash Mayya edited comment on KAFKA-14876 at 4/17/23 4:11 PM:
-

Yep, makes sense, I can raise a PR for that soon.


was (Author: yash.mayya):
Yep, makes sense, I can raise a PR for that tomorrow.

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713183#comment-17713183
 ] 

Yash Mayya commented on KAFKA-14876:


[~ChrisEgerton] I think we should document the new stop API only with the alter 
/ reset offsets APIs as it might not make much sense without them, what do you 
think?

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] LinShunKang commented on pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

2023-04-17 Thread via GitHub


LinShunKang commented on PR #12545:
URL: https://github.com/apache/kafka/pull/12545#issuecomment-1511641622

   > LGTM! Thanks for the improvement!
   > 
   > @LinShunKang , I think this improvement can also apply to other types, ex: 
`UUIDDeserializer` is just a wrapper for `StringDeserializer`. And also, the 
number deserializer should also be able to be handled by ByteBuffer directly. 
WDYT?
   
   Thank you for reviewing.
   I already implement `deserialize(String, Headers, ByteBuffer)` for other 
Deserializers, PTAL : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14304) ZooKeeper to KRaft Migration

2023-04-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14304:
---
Fix Version/s: (was: 3.5.0)
   (was: 3.4.1)

> ZooKeeper to KRaft Migration
> 
>
> Key: KAFKA-14304
> URL: https://issues.apache.org/jira/browse/KAFKA-14304
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> Top-level JIRA for 
> [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14403) Snapshot failure metrics

2023-04-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713168#comment-17713168
 ] 

Mickael Maison commented on KAFKA-14403:


We're now past feature freeze for 3.5.0 so moving to 3.6.0.

> Snapshot failure metrics
> 
>
> Key: KAFKA-14403
> URL: https://issues.apache.org/jira/browse/KAFKA-14403
> Project: Kafka
>  Issue Type: New Feature
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.5.0
>
>
> Implement the following metrics:
> Controller:
> kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors
> Incremented anytime the controller fails to generate a snapshot. Reset to 
> zero anytime the controller restarts or a snapshot is successfully generated.
> Broker:
> kafka.server:type=broker-metadata-metrics,name=snapshot-generation-errors
> Incremented anytime the broker fails to generate a snapshot. Reset to zero 
> anytime the broker restarts or a snapshot is successfully generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14403) Snapshot failure metrics

2023-04-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14403:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Snapshot failure metrics
> 
>
> Key: KAFKA-14403
> URL: https://issues.apache.org/jira/browse/KAFKA-14403
> Project: Kafka
>  Issue Type: New Feature
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> Implement the following metrics:
> Controller:
> kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors
> Incremented anytime the controller fails to generate a snapshot. Reset to 
> zero anytime the controller restarts or a snapshot is successfully generated.
> Broker:
> kafka.server:type=broker-metadata-metrics,name=snapshot-generation-errors
> Incremented anytime the broker fails to generate a snapshot. Reset to zero 
> anytime the broker restarts or a snapshot is successfully generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14304) ZooKeeper to KRaft Migration

2023-04-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713166#comment-17713166
 ] 

Mickael Maison commented on KAFKA-14304:


It looks like there's still quite a few things to do, so moving it to 3.6.0.

I believe having 3.4.1 as the fixed version is incorrect, so removing it too.

> ZooKeeper to KRaft Migration
> 
>
> Key: KAFKA-14304
> URL: https://issues.apache.org/jira/browse/KAFKA-14304
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> Top-level JIRA for 
> [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-17 Thread via GitHub


ijuma commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1511617282

   > It was about 1/40 locally on my laptop.
   
   And now it passes consistently locally?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14876:
---
Fix Version/s: 3.5.0

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713158#comment-17713158
 ] 

Yash Mayya commented on KAFKA-14876:


Yep, makes sense, I can raise a PR for that tomorrow.

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-17 Thread via GitHub


jolshan commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1168901227


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -396,8 +397,14 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   } else if (txnMetadata.producerEpoch != producerEpoch) {
 Left(Errors.PRODUCER_FENCED)
   } else if (txnMetadata.pendingTransitionInProgress) {
-// return a retriable exception to let the client backoff and retry
-Left(Errors.CONCURRENT_TRANSACTIONS)
+// If we are in the produce path, we want to avoid 
OutOfOrderSequence errors if the added partition is pending.
+// TODO: Part 2 of KIP-890 will always start transactions with 
sequence 0, so we enforce that and avoid this workaround.

Review Comment:
   I can remove the "TODO" parts of the comments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-17 Thread via GitHub


jolshan commented on PR #13579:
URL: https://github.com/apache/kafka/pull/13579#issuecomment-1511592266

   It was about 1/40 locally on my laptop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14840) Handle KRaft snapshots in dual-write mode

2023-04-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713142#comment-17713142
 ] 

Mickael Maison commented on KAFKA-14840:


[~mumrah] Is this a blocker for 3.5.0 or should we move this to the next 
release?

> Handle KRaft snapshots in dual-write mode
> -
>
> Key: KAFKA-14840
> URL: https://issues.apache.org/jira/browse/KAFKA-14840
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1
>
>
> While the KRaft controller is making writes back to ZK during the migration, 
> we need to handle the case when a snapshot is loaded. This can happen for a 
> number of reasons in KRaft.
> The difficulty here is we will need to compare the loaded snapshot with the 
> entire state in ZK. Most likely, this will be a very expensive operation.
> Without this, dual-write mode cannot safely tolerate a snapshot being loaded, 
> so marking this as a 3.5 blocker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713136#comment-17713136
 ] 

Mickael Maison commented on KAFKA-14876:


Since the GET endpoint is in 3.5, can you do a small PR to document that part?

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-17 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1168861947


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   hey sorry - i'm in progress of writing a test here 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-04-17 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1168675481


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1283,10 +1289,16 @@ RequestFuture sendOffsetCommitRequest(final 
Maphttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the mapping between topic names and ids assuming a 1:1 
relationship between
+ * a name and an id.
+ * 
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+public class TopicIdAndNameBiMap {
+private final Map topicIds;
+private final Map topicNames;
+
+/**
+ * A mapping which universe of topic ids and names is captured from the 
input map. The reverse association
+ * between a topic ID and a topic name is computed by this method. If 
there are more than one topic name
+ * resolving to the same topic ID, an {@link InvalidTopicException} is 
thrown.
+ */
+public static TopicIdAndNameBiMap fromTopicIds(Map topicIds) 
{
+Map topicNames = new HashMap<>(topicIds.size());
+
+for (Map.Entry e: topicIds.entrySet()) {
+String conflicting = topicNames.putIfAbsent(e.getValue(), 
e.getKey());
+if (conflicting != null) {
+throw new IllegalStateException(
+"Topic " + e.getKey() + " shares the same ID " + 
e.getValue() + " as topic " + conflicting);
+}
+}
+
+return new TopicIdAndNameBiMap(topicIds, topicNames);
+}
+
+/**
+ * A mapping which acts as a wrapper around the input mapping of topic ids 
from/to topic names.
+ * No validation is performed about the consistency of the mapping. This 
method is to be preferred
+ * when the copy of the input maps needs to be avoided.
+ */
+public static TopicIdAndNameBiMap wrap(Map topicIds, 
Map topicNames) {
+return new TopicIdAndNameBiMap(topicIds, topicNames);
+}
+
+/**
+ * Used when no mapping between topic name and id exists.
+ */
+public static TopicIdAndNameBiMap emptyMapping() {
+return fromTopicIds(Collections.emptyMap());
+}
+
+private TopicIdAndNameBiMap(Map topicIds, Map 
topicNames) {
+this.topicIds = Collections.unmodifiableMap(topicIds);
+this.topicNames = Collections.unmodifiableMap(topicNames);
+}
+
+/**
+ * Returns the ID of the topic with the given name, if that association 
exists.
+ */
+public Uuid getTopicIdOrZero(String name) {
+return Optional.ofNullable(topicIds.get(name)).orElse(Uuid.ZERO_UUID);

Review Comment:
   nit: Should we replace `ofNullable` by a simple `if/else` statement? 
Allocating an optional does not seem necessary here.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
 assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
 }
 
+@ParameterizedTest
+@ValueSource(booleans = { true, false })
+public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+// Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+// Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+// Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+// Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+// timer never moves forward once the network client is invoked. If 
there is no avail

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

2023-04-17 Thread via GitHub


jeffkbkim commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168815457


##
group-coordinator/src/main/resources/common/message/GroupMetadataValue.json:
##
@@ -16,8 +16,11 @@
 {
   "type": "data",
   "name": "GroupMetadataValue",
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  // Version 4 is the first flexible version.
+  // KIP-915: bumping the version will no longer make this record backward 
compatible.
+  // We suggest to add/remove only tagged fields to maintain backward 
compatibility.
+  "validVersions": "0-4",

Review Comment:
   From the KIP:
   > Any future tagged fields will not require a version bump and older brokers 
can simply ignore the tagged fields they do not understand. Note that 
introducing a new non-tagged field or removing an existing non-tagged field in 
the future will not be backward compatible.
   
   We decided not to move forward with enforcing that the version is never 
bumped; instead, we warn the developers that bumping the version will cause 
backward incompatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13560: MINOR: Add ClusterTool wrapper scripts and redirection (KIP-906)

2023-04-17 Thread via GitHub


mimaison merged PR #13560:
URL: https://github.com/apache/kafka/pull/13560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13559: MINOR: Add ConsumerPerformance tool migration note

2023-04-17 Thread via GitHub


mimaison commented on PR #13559:
URL: https://github.com/apache/kafka/pull/13559#issuecomment-1511403013

   Backported to 3.5 too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13559: MINOR: Add ConsumerPerformance tool migration note

2023-04-17 Thread via GitHub


mimaison merged PR #13559:
URL: https://github.com/apache/kafka/pull/13559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13586: MINOR: Add StreamsResetter tool migration note

2023-04-17 Thread via GitHub


mimaison commented on PR #13586:
URL: https://github.com/apache/kafka/pull/13586#issuecomment-1511391315

   Backported to 3.5 too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13586: MINOR: Add StreamsResetter tool migration note

2023-04-17 Thread via GitHub


mimaison merged PR #13586:
URL: https://github.com/apache/kafka/pull/13586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13556: MINOR: Add EndToEndLatency wrapper scripts and redirection (KIP-906)

2023-04-17 Thread via GitHub


mimaison commented on PR #13556:
URL: https://github.com/apache/kafka/pull/13556#issuecomment-1511358901

   Since the initial EndToEndLatency changes are in 3.5, I've backported this 
to 3.5 too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13556: MINOR: Add EndToEndLatency wrapper scripts and redirection (KIP-906)

2023-04-17 Thread via GitHub


mimaison merged PR #13556:
URL: https://github.com/apache/kafka/pull/13556


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-04-17 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713089#comment-17713089
 ] 

Ismael Juma commented on KAFKA-14084:
-

I am confused about https://issues.apache.org/jira/browse/KAFKA-14775 though, 
is that still missing or not?

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-04-17 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713087#comment-17713087
 ] 

Ismael Juma commented on KAFKA-14084:
-

The zk migration work should be tracked as part of the overall migration task. 
if scram works in the "new cluster" case, we should mark this as done and 
include it as a note in the 3.5 release blog post.

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13195: Minor: Add JmxTool wrapper scripts and redirection (KIP-906)

2023-04-17 Thread via GitHub


mimaison commented on PR #13195:
URL: https://github.com/apache/kafka/pull/13195#issuecomment-1511325839

   Since the initial JmxTool changes are in 3.5, I've backported this to 3.5 
too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13195: Minor: Add JmxTool wrapper scripts and redirection (KIP-906)

2023-04-17 Thread via GitHub


mimaison merged PR #13195:
URL: https://github.com/apache/kafka/pull/13195


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13195: Minor: Add JmxTool wrapper scripts and redirection (KIP-906)

2023-04-17 Thread via GitHub


dajac commented on PR #13195:
URL: https://github.com/apache/kafka/pull/13195#issuecomment-1511278752

   @mimaison No objections. I do agree that we should avoid breaking things.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-17 Thread via GitHub


dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1168638509


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   @philipnee I am trying to convince myself about this change. This basically 
means that whenever a member is late for the sync-group phase, it will abandon 
all its partitions. Here late means that the member sends the sync-group 
request after the next rebalance has started. I wonder how common this is, 
especially in large groups.
   
   My understanding is that all pending sync-group requests are completed when 
the leader sends the assignment. When they are completed, the members with 
partitions to be revoked calls revoke them and re-join more-or-less immediately 
(because we don't commit offsets in the cooperative mode, I think). This makes 
me think that this will happen regularly in medium to large groups.
   
   Could you elaborate a bit more on the reasoning behind this conservative 
change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13583: MINOR: Correct type of record batch CRC field

2023-04-17 Thread via GitHub


showuon merged PR #13583:
URL: https://github.com/apache/kafka/pull/13583


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-04-17 Thread via GitHub


Hangleton commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1168535982


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +66,21 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException
+   */
+  @Test
+  def testAppendForLogSegmentOffsetOverflowException(): Unit = {
+val seg = createSegment(0)
+val largestOffset: Long = Integer.MAX_VALUE + 1
+val largestTimestamp: Long = Time.SYSTEM.milliseconds()
+val shallowOffsetOfMaxTimestamp: Long = 0
+val memoryRecords: MemoryRecords = records(0, "hello")
+assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
+  seg.append(largestOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, 
memoryRecords)

Review Comment:
   nit: `largestTimestamp` could be misleading as it could make believe the 
timestamp itself is set to the maximum possible value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-04-17 Thread via GitHub


divijvaidya commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1168516608


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +66,21 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException
+   */
+  @Test
+  def testAppendForLogSegmentOffsetOverflowException(): Unit = {
+val seg = createSegment(0)
+val largestOffset: Long = Integer.MAX_VALUE + 1
+val largestTimestamp: Long = Time.SYSTEM.milliseconds()

Review Comment:
   In the spirit of keeping this test run fast and deterministic, could we hard 
code a value here?



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +66,21 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException
+   */
+  @Test
+  def testAppendForLogSegmentOffsetOverflowException(): Unit = {
+val seg = createSegment(0)
+val largestOffset: Long = Integer.MAX_VALUE + 1

Review Comment:
   I would suggest to use `@ParameterizedTest` and also test for:
   1. Negative value for largestOffset
   2. Value exceeding the range of integer (existing) 
   
   



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +66,21 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException
+   */
+  @Test
+  def testAppendForLogSegmentOffsetOverflowException(): Unit = {
+val seg = createSegment(0)

Review Comment:
   May I suggest to create a segment with non-zero index so that we can test 
for 
https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java#L554
 condition to be false. The `relativeOffset` should fit within an integer but 
the current test case only tests for situation where we have 0 as starting 
index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-7735) StateChangeLogMerger tool can not work due to incorrect topic regular matches

2023-04-17 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-7735:
---
Fix Version/s: 3.6.0

> StateChangeLogMerger tool can not work due to incorrect topic regular matches
> -
>
> Key: KAFKA-7735
> URL: https://issues.apache.org/jira/browse/KAFKA-7735
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 2.0.0
>Reporter: Fangbin Sun
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.6.0
>
>
> When StateChangeLogMerger tool tries to obtain a topic's state-change-log, it 
> returns nothing.
> {code:java}
> bin/kafka-run-class.sh com.cmss.kafka.api.StateChangeLogMerger --logs 
> state-change.log --topic test{code}
> This tool uses a topic partition regex as follows:
> {code:java}
> val topicPartitionRegex = new Regex("\\[(" + Topic.LEGAL_CHARS + "+),( 
> )*([0-9]+)\\]"){code}
> However the state-change-log no longer prints log in the above format. e.g. 
> in 0.10.2.0, it prints some state-change logs by case class TopicAndPartition 
> which overrided as follows:
> {code:java}
> override def toString = "[%s,%d]".format(topic, partition){code}
> In a newer version (e.g. 1.0.0+) it prints most of state-change logs in the 
> form of "partition $topic-$partition", as a workaround one can modify the 
> topic partition regex like:
> {code:java}
> val topicPartitionRegex = new Regex("(partition " + Topic.LEGAL_CHARS + 
> "+)-([0-9]+)"){code}
> and match topic with "matcher.group(1).substring(10)", however some output of 
> state changes might be a little bit redundant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14584) Move StateChangeLogMerger to tools

2023-04-17 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14584:

Fix Version/s: 3.6.0

> Move StateChangeLogMerger to tools
> --
>
> Key: KAFKA-14584
> URL: https://issues.apache.org/jira/browse/KAFKA-14584
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-04-17 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-150416

   > Hey @Hangleton. I just got back to this PR. I made a pass over the files 
in `core` and I left some comments. As a general ask, it would be great if we 
could keep avoid large refactoring in tests in this PR as they are very 
distracting. I am not against refactoring but I would do them in separate PRs.
   
   Hi David, thanks for the review. Understand about refactoring, I will try to 
see if I can revert some of them if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14881) Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14881:
---
Fix Version/s: 3.5.0
   (was: 3.6.0)

> Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
> 
>
> Key: KAFKA-14881
> URL: https://issues.apache.org/jira/browse/KAFKA-14881
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.5.0
>
>
> I want to support ZK to KRaft migration.
> ZK stores a storedKey and a serverKey for each SCRAM credential not the 
> saltedPassword.
> The storedKey and serverKey are a crypto hash of some data with the 
> saltedPassword and it is not possible to extract the saltedPassword from them.
> The serverKey and storedKey are enough for SCRAM authentication and 
> saltedPassword is not needed.
> I will update the UserScramCredentialRecord to store serverKey and storedKey 
> instead of saltedPassword and I will update that SCRAM is only supported with 
> a bumped version of IBP_3_5 so that there are no compatibility issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread via GitHub


mimaison commented on PR #13513:
URL: https://github.com/apache/kafka/pull/13513#issuecomment-1511108866

   @cmccabe It looks like this would be nice to backport to 3.5. If I 
understand correctly it would allow users enabling SCRAM in 3.5 to update to 
3.6 without issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14881) Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713017#comment-17713017
 ] 

Mickael Maison edited comment on KAFKA-14881 at 4/17/23 10:41 AM:
--

Right, this would be nice to have in 3.5.0. 


was (Author: mimaison):
Right, this would be nice to have in 3.5.0. 

> Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
> 
>
> Key: KAFKA-14881
> URL: https://issues.apache.org/jira/browse/KAFKA-14881
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.6.0
>
>
> I want to support ZK to KRaft migration.
> ZK stores a storedKey and a serverKey for each SCRAM credential not the 
> saltedPassword.
> The storedKey and serverKey are a crypto hash of some data with the 
> saltedPassword and it is not possible to extract the saltedPassword from them.
> The serverKey and storedKey are enough for SCRAM authentication and 
> saltedPassword is not needed.
> I will update the UserScramCredentialRecord to store serverKey and storedKey 
> instead of saltedPassword and I will update that SCRAM is only supported with 
> a bumped version of IBP_3_5 so that there are no compatibility issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14881) Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713017#comment-17713017
 ] 

Mickael Maison commented on KAFKA-14881:


Right, this would be nice to have in 3.5.0. 

> Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
> 
>
> Key: KAFKA-14881
> URL: https://issues.apache.org/jira/browse/KAFKA-14881
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.6.0
>
>
> I want to support ZK to KRaft migration.
> ZK stores a storedKey and a serverKey for each SCRAM credential not the 
> saltedPassword.
> The storedKey and serverKey are a crypto hash of some data with the 
> saltedPassword and it is not possible to extract the saltedPassword from them.
> The serverKey and storedKey are enough for SCRAM authentication and 
> saltedPassword is not needed.
> I will update the UserScramCredentialRecord to store serverKey and storedKey 
> instead of saltedPassword and I will update that SCRAM is only supported with 
> a bumped version of IBP_3_5 so that there are no compatibility issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-04-17 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1168410473


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -427,35 +428,59 @@ class KafkaApis(val requestChannel: RequestChannel,
   requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
   CompletableFuture.completedFuture[Unit](())
 } else {
+  val topicIdAndNames = metadataCache.topicIdAndNames()

Review Comment:
   It looks like `topicIdAndNames` is only used if version >= 9. Should we move 
it that else branch? Moreover, it seems that we don't need the BiMap anymore 
here. Should we just get the mapping that we need and revert the BiMap think in 
the `MetadataCache`?



##
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##
@@ -489,24 +489,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createOffsetCommitRequest = {
-new requests.OffsetCommitRequest.Builder(
-new OffsetCommitRequestData()
-  .setGroupId(group)
-  .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
-  .setGenerationId(1)
-  .setTopics(Collections.singletonList(
-new OffsetCommitRequestData.OffsetCommitRequestTopic()
-  .setName(topic)
-  .setPartitions(Collections.singletonList(
-new OffsetCommitRequestData.OffsetCommitRequestPartition()
-  .setPartitionIndex(part)
-  .setCommittedOffset(0)
-  
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
-  .setCommitTimestamp(OffsetCommitRequest.DEFAULT_TIMESTAMP)
-  .setCommittedMetadata("metadata")
-  )))
-  )
-).build()
+val data = new OffsetCommitRequestData()
+  .setGroupId(group)
+  .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+  .setGenerationId(1)
+  .setTopics(Collections.singletonList(
+new OffsetCommitRequestData.OffsetCommitRequestTopic()
+  .setName(topic)
+  .setPartitions(Collections.singletonList(
+new OffsetCommitRequestData.OffsetCommitRequestPartition()
+  .setPartitionIndex(part)
+  .setCommittedOffset(0)
+  .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+  .setCommitTimestamp(OffsetCommitRequest.DEFAULT_TIMESTAMP)
+  .setCommittedMetadata("metadata")
+  )))
+  )
+new requests.OffsetCommitRequest.Builder(data, true).build()

Review Comment:
   Just to be sure. The addition of `true` is the only real change here, right? 



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -1264,22 +1270,25 @@ class KafkaApisTest {
 )
   }
 
-  @Test
-  def testHandleOffsetCommitRequest(): Unit = {
-addTopicToMetadataCache("foo", numPartitions = 1)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testHandleOffsetCommitRequest(version: Short): Unit = {
+val fooId = Uuid.randomUuid()
+addTopicToMetadataCache("foo", numPartitions = 1, topicId = fooId)
 
 val offsetCommitRequest = new OffsetCommitRequestData()
   .setGroupId("group")
   .setMemberId("member")
   .setTopics(List(
 new OffsetCommitRequestData.OffsetCommitRequestTopic()
   .setName("foo")
+  .setTopicId(if (version >= 9) fooId else Uuid.ZERO_UUID)

Review Comment:
   I think that `TopicId` is optional so we could just set it here.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -1353,73 +1375,29 @@ class KafkaApisTest {
 
   @Test
   def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {

Review Comment:
   Should this test be parameterized as well? With this change, it seems that 
we don't have any tests exercising the validation with topic names now.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3885,32 +3882,33 @@ class KafkaApisTest {
 
   @Test
   def rejectOffsetCommitRequestWhenStaticMembershipNotSupported(): Unit = {
-val offsetCommitRequest = new OffsetCommitRequest.Builder(
-  new OffsetCommitRequestData()
-.setGroupId("test")
-.setMemberId("test")
-.setGroupInstanceId("instanceId")
-.setGenerationId(100)
-.setTopics(Collections.singletonList(
-  new OffsetCommitRequestData.OffsetCommitRequestTopic()
-.setName("test")
-.setPartitions(Collections.singletonList(
-  new OffsetCommitRequestData.OffsetCommitRequestPartition()
-.setPartitionIndex(0)
-.setCommittedOffset(100)
-.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
-.setCommittedMetadata("")
-))
-))
-).build()
+

[jira] [Created] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.

2023-04-17 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-14915:
--

 Summary: Option to consume multiple partitions that have their 
data in remote storage for the target offsets.
 Key: KAFKA-14915
 URL: https://issues.apache.org/jira/browse/KAFKA-14915
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.

2023-04-17 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-14915:
--

Assignee: Kamal Chandraprakash

> Option to consume multiple partitions that have their data in remote storage 
> for the target offsets.
> 
>
> Key: KAFKA-14915
> URL: https://issues.apache.org/jira/browse/KAFKA-14915
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #13529: KAFKA-14133: Migrate topology builder mock in TaskManagerTest to mockito

2023-04-17 Thread via GitHub


cadonna commented on code in PR #13529:
URL: https://github.com/apache/kafka/pull/13529#discussion_r1168432918


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -970,6 +971,7 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 expectLastCall().anyTimes();
 final TasksRegistry tasks = mock(TasksRegistry.class);
 
when(tasks.removePendingTaskToCloseClean(taskToClose.id())).thenReturn(true);
+when(tasks.removePendingTaskToCloseClean(argThat(taskId -> 
!taskId.equals(taskToClose.id().thenReturn(false);

Review Comment:
   I had to add this since Mockito threw an error. The cause of the errors was 
that this PR set the Mockito stubs to strict. The error was:
   
   ```
   org.mockito.exceptions.misusing.PotentialStubbingProblem: 
   Strict stubbing argument mismatch. Please check:
- this invocation of 'removePendingTaskToCloseClean' method:
   tasksRegistry.removePendingTaskToCloseClean(
   0_3
   );
   -> at 
org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:876)
- has following stubbing(s) with different arguments:
   1. tasksRegistry.removePendingTaskToCloseClean(
   0_2
   );
 -> at 
org.apache.kafka.streams.processor.internals.TaskManagerTest.shouldHandleMultipleRemovedTasksFromStateUpdater(TaskManagerTest.java:973)
   Typically, stubbing argument mismatch indicates user mistake when writing 
tests.
   Mockito fails early so that you can debug potential problem easily.
   However, there are legit scenarios when this exception generates false 
negative signal:
 - stubbing the same method multiple times using 'given().will()' or 
'when().then()' API
   Please use 'will().given()' or 'doReturn().when()' API for stubbing.
 - stubbed method is intentionally invoked with different arguments by code 
under test
   Please use default or 'silent' JUnit Rule (equivalent of 
Strictness.LENIENT).
   For more information see javadoc for PotentialStubbingProblem class.
   ``` 
   
   In this case, the code under test calls `removePendingTaskToCloseClean()` 
multiple times with different arguments. That is normal but needs to be 
considered in the unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >