[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-03-06 Thread via GitHub


ivanyu commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1127463069


##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(final Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(final String name, final Map 
tags) {
+final String pkg;
+if (klass.getPackage() == null) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-03-06 Thread via GitHub


ivanyu commented on PR #13067:
URL: https://github.com/apache/kafka/pull/13067#issuecomment-1457689418

   Thank you @satishd! I addressed your last comment.
   
   > Please have incremental commits instead of a single squashed/merged 
commit. This is helpful for reviewers to review the incremental changes done in 
the PR that address the review comments.
   
   I relied on Github being able to show the diff on force-push, but sure, I'll 
do incremental commits and can squash before merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14787) Supports reading ssl certificate files from hdfs

2023-03-06 Thread melin (Jira)
melin created KAFKA-14787:
-

 Summary: Supports reading ssl certificate files from hdfs
 Key: KAFKA-14787
 URL: https://issues.apache.org/jira/browse/KAFKA-14787
 Project: Kafka
  Issue Type: Bug
Reporter: melin
 Attachments: image-2023-03-07-15-27-08-077.png

spark/flink runs on hadoop yarn,Supports reading ssl certificate files from 
hdfs,
Paths.get(String) is replaced with Paths.get(uri), which registers the 
HadoopFileSystemProvider and enables NIO to identify the hdfs file path
!image-2023-03-07-15-27-08-077.png!
https://github.com/damiencarol/jsr203-hadoop



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-06 Thread via GitHub


vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1127290672


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+private static final String STORE_NAME = "versioned-store";
+private static final long HISTORY_RETENTION = 3600_000L;
+
+private String inputStream;
+private String outputStream;
+private long baseTimestamp;
+
+private KafkaStreams kafkaStreams;
+
+private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+@BeforeClass
+public static void before() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void after() {
+CLUSTER.stop();
+}
+
+@Before
+public void beforeTest() throws InterruptedException {
+final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+inputStream = "input-stream-" + uniqueTestName;
+outputStream = "output-stream-" + uniqueTestName;
+CLUSTER.createTopic(inputStream);

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-06 Thread via GitHub


vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1127274296


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Do we also need to increase `delete.retention.ms` for this case?
   
   Nope, the changelog topic for verisoned stores is only compacted, so 
`delete.retention.ms` is unused. (In contrast to the windowed changelog topic 
case, where both compaction and retention-based deletion are enabled)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Do we also need to increase `delete.retention.ms` for this case?
   
   Nope, the changelog topic for versioned stores is only compacted, so 
`delete.retention.ms` is unused. (In contrast to the windowed changelog topic 
case, where both compaction and retention-based deletion are enabled)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-06 Thread via GitHub


vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1127273827


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Why are we setting this "outside" but not via the constructor? (I see that 
we do the same thing for WindowedChangelogTopicConfig but I am not sure why 
either.)
   
   No reason, AFAICT -- I was just following precedent. Would you prefer 
passing `factory.historyRetention()` into the constructor and setting it there? 
I can refactor `WindowedChangelogTopicConfig` as well if so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14371) quorum-state file contains empty/unused clusterId field

2023-03-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14371:
---
Fix Version/s: (was: 3.5.0)

> quorum-state file contains empty/unused clusterId field
> ---
>
> Key: KAFKA-14371
> URL: https://issues.apache.org/jira/browse/KAFKA-14371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Priority: Minor
>  Labels: needs-kip
>
> The KRaft controller's quorum-state file 
> `$LOG_DIR/__cluster_metadata-0/quorum-state` contains an empty clusterId 
> value.  This value is never non-empty, and it is never used after it is 
> written and then subsequently read.  This is a cosmetic issue; it would be 
> best if this value did not exist there.  The cluster ID already exists in the 
> `$LOG_DIR/meta.properties` file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14371) quorum-state file contains empty/unused clusterId field

2023-03-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio reopened KAFKA-14371:

  Assignee: (was: Gantigmaa Selenge)

> quorum-state file contains empty/unused clusterId field
> ---
>
> Key: KAFKA-14371
> URL: https://issues.apache.org/jira/browse/KAFKA-14371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Priority: Minor
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> The KRaft controller's quorum-state file 
> `$LOG_DIR/__cluster_metadata-0/quorum-state` contains an empty clusterId 
> value.  This value is never non-empty, and it is never used after it is 
> written and then subsequently read.  This is a cosmetic issue; it would be 
> best if this value did not exist there.  The cluster ID already exists in the 
> `$LOG_DIR/meta.properties` file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio merged pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


jsancio merged PR #13355:
URL: https://github.com/apache/kafka/pull/13355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127264709


##
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+val consumer = createAsyncConsumer()
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.commitAsync(cb)
+waitUntilTrue(() => {
+  cb.successCount == 1
+}, "wait until commit is completed successfully", 5000)
+consumer.commitSync();

Review Comment:
   Other integration tests test the commit with committed method call. I'll 
leave this as TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127257845


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-private Map properties;
-private SubscriptionState subscriptionState;
-private MockTime time;
-private LogContext logContext;
-private Metrics metrics;
-private ClusterResourceListeners clusterResourceListeners;
-private Optional groupId;
-private String clientId;
-private EventHandler eventHandler;
+
+private Consumer consumer;
+private Map consumerProps = new HashMap<>();
+
+private final Time time = new MockTime();
 
 @BeforeEach
 public void setup() {
-this.subscriptionState = Mockito.mock(SubscriptionState.class);
-this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-this.logContext = new LogContext();
-this.time = new MockTime();
-this.metrics = new Metrics(time);
-this.groupId = Optional.empty();
-this.clientId = "client-1";
-this.clusterResourceListeners = new ClusterResourceListeners();
-this.properties = new HashMap<>();
-this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-"localhost" +
-":");
-this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(CLIENT_ID_CONFIG, "test-client");
+injectConsumerConfigs();
+}
+
+@AfterEach
+public void cleanup() {
+if (consumer != null) {
+consumer.close(Duration.ZERO);
+}
 }
+
 @Test
-public void testSubscription() {
-this.subscriptionState =
-new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
-subscriptionState.subscribe(singleton("t1"),
-new NoOpConsumerRebalanceListener());
-assertEquals(1, consumer.subscription().size());
+public void testBackgroundThreadRunning() {
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
 }
 
 @Test
 public void testUnimplementedException() {
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
 assertThrows(KafkaException.class, consumer::assignment, "not 
implemented exception");
 }
 
-public PrototypeAsyncConsumer setupConsumerWithDefault() {
-ConsumerConfig config = new ConsumerConfig(properties);
-return new PrototypeAsyncConsumer<>(
-this.time,
-this.logContext,
-config,
-this.subscriptionState,
-this.eventHandler,
-this.metrics,
-this.clusterResourceListeners,
-this.groupId,
-this.clientId,
-0);
+private ConsumerMetadata createMetadata(SubscriptionState subscription) {
+return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+subscription, new LogContext(), 

[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127256908


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
 @Override
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
-final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(offsets);
-commitEvent.future().whenComplete((r, t) -> {
-callback.onComplete(offsets, new RuntimeException(t));
+CompletableFuture future = commit(offsets);
+future.whenComplete((r, t) -> {
+if (t != null) {
+callback.onComplete(offsets, new RuntimeException(t));
+} else {
+callback.onComplete(offsets, null);
+}
 });
+}
+
+private CompletableFuture commit(Map offsets) {

Review Comment:
   Thanks, for 1 - I think I actually wanted to say commitSync(), sorry about 
that.
   
   I think they aren't part of the existing interface, right? Currently, both 
commitSync and poll don't return a future, and I was wondering if it makes 
sense for us to do that.  Also, it makes the interface feels more modern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127253922


##
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+val consumer = createAsyncConsumer()
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.commitAsync(cb)
+waitUntilTrue(() => {
+  cb.successCount == 1
+}, "wait until commit is completed successfully", 5000)
+consumer.commitSync();

Review Comment:
   Maybe I'll break it up into two tests there.  It doesn't really make sense 
to call commit twice in a row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127253699


##
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+val consumer = createAsyncConsumer()
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.commitAsync(cb)
+waitUntilTrue(() => {
+  cb.successCount == 1
+}, "wait until commit is completed successfully", 5000)
+consumer.commitSync();

Review Comment:
   I think the idea was to test both commitSync and commitAsync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1457356989

   re. @guozhangwang - sorry, I think originally I added 2 tests, then I 
reduced to 1. Thanks for catching this.
   
   For testing poll() - The fetcher isn't available but I could try to test 
poll?
   
   Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127252457


##
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+val consumer = createAsyncConsumer()
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.commitAsync(cb)
+waitUntilTrue(() => {
+  cb.successCount == 1
+}, "wait until commit is completed successfully", 5000)
+consumer.commitSync();

Review Comment:
   +1 : although i don't know how to verify that but let me look into it. 
thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127251663


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-private Map properties;
-private SubscriptionState subscriptionState;
-private MockTime time;
-private LogContext logContext;
-private Metrics metrics;
-private ClusterResourceListeners clusterResourceListeners;
-private Optional groupId;
-private String clientId;
-private EventHandler eventHandler;
+
+private Consumer consumer;
+private Map consumerProps = new HashMap<>();
+
+private final Time time = new MockTime();
 
 @BeforeEach
 public void setup() {
-this.subscriptionState = Mockito.mock(SubscriptionState.class);
-this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-this.logContext = new LogContext();
-this.time = new MockTime();
-this.metrics = new Metrics(time);
-this.groupId = Optional.empty();
-this.clientId = "client-1";
-this.clusterResourceListeners = new ClusterResourceListeners();
-this.properties = new HashMap<>();
-this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-"localhost" +
-":");
-this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(CLIENT_ID_CONFIG, "test-client");
+injectConsumerConfigs();
+}
+
+@AfterEach
+public void cleanup() {
+if (consumer != null) {
+consumer.close(Duration.ZERO);
+}
 }
+
 @Test
-public void testSubscription() {
-this.subscriptionState =
-new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
-subscriptionState.subscribe(singleton("t1"),
-new NoOpConsumerRebalanceListener());
-assertEquals(1, consumer.subscription().size());
+public void testBackgroundThreadRunning() {
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());

Review Comment:
   maybe we don't need this afterall actually. I feel for the purpose of unit 
test, we should just try to test the API and method calls. The integration test 
in this PR is already kind of testing this (otherwise request won't come 
through)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127250227


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-private Map properties;
-private SubscriptionState subscriptionState;
-private MockTime time;
-private LogContext logContext;
-private Metrics metrics;
-private ClusterResourceListeners clusterResourceListeners;
-private Optional groupId;
-private String clientId;
-private EventHandler eventHandler;
+
+private Consumer consumer;
+private Map consumerProps = new HashMap<>();
+
+private final Time time = new MockTime();
 
 @BeforeEach
 public void setup() {
-this.subscriptionState = Mockito.mock(SubscriptionState.class);
-this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-this.logContext = new LogContext();
-this.time = new MockTime();
-this.metrics = new Metrics(time);
-this.groupId = Optional.empty();
-this.clientId = "client-1";
-this.clusterResourceListeners = new ClusterResourceListeners();
-this.properties = new HashMap<>();
-this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-"localhost" +
-":");
-this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(CLIENT_ID_CONFIG, "test-client");
+injectConsumerConfigs();
+}
+
+@AfterEach
+public void cleanup() {
+if (consumer != null) {
+consumer.close(Duration.ZERO);
+}
 }
+
 @Test
-public void testSubscription() {
-this.subscriptionState =
-new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
-subscriptionState.subscribe(singleton("t1"),
-new NoOpConsumerRebalanceListener());
-assertEquals(1, consumer.subscription().size());
+public void testBackgroundThreadRunning() {
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());

Review Comment:
   I'm thinking what's the best way to test that. As we don't have direct 
access to the background thread from the consumer.  I guess we could do that 
passively by trying to catch the exception, or maybe, actively, I should add a 
`public State state()` to the eventHandler?  to allow user to prob the 
background thread stat... I feel we need both, but WDYT? @rajinisivaram 
@guozhangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-06 Thread via GitHub


mjsax commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1127245620


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   Do we also need to increase `delete.retention.ms` for this case? Not sure 
how both config "interact".



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   Why are we setting this "outside" but not via the constructor? (I see that 
we do the same thing for `WindowedChangelogTopicConfig` but I am not sure why 
either.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


jsancio commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127245225


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   Yeah. Maybe we can do this in 4.0 but I need to confirm that 4.x won't 
support software downgrade to 3.x.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


jsancio commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127237201


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   This PR reverts a commit that made it to `trunk` but it is not included in 
any of the releases. We need to revert this before we can release 3.5.
   
   > I am confused, why not just have ClusterId and versions: "0" (no plus sign 
at the end)?
   
   I think that that may be part of the change. We still want to handle the 
software downgrade since 3.4 and less assumes `clusterId` exists. In the 
associated Jira I suggested creating a KIP for this improvement. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


showuon commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127240280


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   Changing the version to "0" cannot fix this backward compatibility issue. 
The version is decided during writing the state file with highest supported 
version. So, once we write the file with a newer version format, the downgrade 
will fail (since we're trying to remove a field). Agree to revert the change 
first, and then decide if we want to raise a KIP to fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


showuon commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127240280


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   Changing the version to "0" cannot fix this backward compatibility issue. 
The version is decided during writing the state file with highest supported 
version. So, once we write the file with a newer version format, the downgrade 
will fail. Agree to revert the change first, and then decide if we want to 
raise a KIP to fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


showuon commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127240280


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   Changing the version to "0" cannot fix this backward compatibility issue. 
The version is designed during writing the state file with highest supported 
version. So, once we write the file with a newer version format, the downgrade 
will fail. Agree to revert the change first, and then decide if we want to 
raise a KIP to fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


jsancio commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127237201


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   This PR reverts a commit that made it to `trunk` but it is not included in 
any of the releases. We need to revert this before we can release 3.5.
   
   > I am confused, why not just have ClusterId and versions: "0" (no plus sign 
at the end)?
   
   I think that that may be part of the change. We still want to handle then 
software downgrade since 3.4 and less assumes `clusterId` exists. In the 
associated Jira I suggested creating a KIP for this improvement. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

2023-03-06 Thread via GitHub


mjsax merged PR #13274:
URL: https://github.com/apache/kafka/pull/13274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-03-06 Thread via GitHub


satishd commented on PR #13067:
URL: https://github.com/apache/kafka/pull/13067#issuecomment-1457307583

   @ivanyu Please have incremental commits instead of a single squashed/merged 
commit. This is helpful for reviewers to review the incremental changes done in 
the PR that address the review comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-03-06 Thread via GitHub


showuon commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1457302953

   Nice catch @jsancio !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-03-06 Thread via GitHub


satishd commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1127226556


##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(final Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(final String name, final Map 
tags) {
+final String pkg;
+if (klass.getPackage() == null) {

Review Comment:
   +1 Have a simpler statetment like below.
   ```
   String pkg = klass.getPackage() == null ? "" : klass.getPackage().getName();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-03-06 Thread via GitHub


satishd commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1127222852


##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(final Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(final String name, final Map 
tags) {
+final String pkg;
+if (klass.getPackage() == null) {

Review Comment:
   +1 Have a simpler statetment like below.
   
   ```
   String pkg = klass.getPackage() == null ? "" : klass.getPackage().getName();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13356: MINOR: Exclude ServiceLoaded plugins after scanning instead of hiding resource file

2023-03-06 Thread via GitHub


gharris1727 opened a new pull request, #13356:
URL: https://github.com/apache/kafka/pull/13356

   This will allow classpath ConfigProvider and ConnectRestExtension to appear 
in ServiceLoader results from within plugins, without discovering them as 
isolated plugins
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


ijuma commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127213105


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -378,6 +378,118 @@ object TestUtils extends Logging {
 props
   }
 
+  def createCombinedControllerConfig(nodeId: Int,

Review Comment:
   `createBrokerConfig` has a ton of parameters, I don't think we should be 
adding more parameters to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127208396


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   I am confused, why not just have ClusterId and `versions: "0"` (no plus sign 
at the end)?
   
   You can't change the past, but you certainly can create a new version 
without this field if you really want.
   
   I guess that also raises the question of how you determine the version. What 
is the strategy there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13355:
URL: https://github.com/apache/kafka/pull/13355#discussion_r1127208396


##
raft/src/main/resources/common/message/QuorumStateData.json:
##
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  // Version 1 removes clusterId field.
-  "validVersions": "0-1",
+  "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
+{"name": "ClusterId", "type": "string", "versions": "0+"},

Review Comment:
   I am confused, why not just have ClusterId and `versions: "0"` (no plus sign 
at the end)?
   
   You can't change the past, but you certainly can create a new version 
without this field if you really want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on PR #13116:
URL: https://github.com/apache/kafka/pull/13116#issuecomment-1457277048

   Thanks for the PR, @rondagostino . You've been very patient here and I'm 
sorry that the review wasn't quicker. We did a major pivot from implementing 
the quota in `ControllerApis` like I originally wanted, to implementing it in 
`QuorumController` like you originally wanted. And I think the QC way is the 
way it has to be, for all the reasons discussed above. So you were right all 
along :)
   
   I spent a few hours looking through this code today to see if there were any 
obvious ways to improve the performance characteristics. The main thing I worry 
about is the impact of locking. The locking in 
`StrictControllerMutationQuota.record`kj is certainly frustrating, since I 
could see it colliding with metrics collection. There are several small locks 
that might be contended (sensor locks, KafkaMetric locks). None of them seems 
to be held for a very long time, though. So overall I cannot find any easy way 
to avoid this worry. We will have to benchmark, I guess.
   
   I left some minor comments that probably you can do in a day or two. I think 
we're pretty close now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-03-06 Thread via GitHub


kirktrue commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1127204081


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -324,11 +324,14 @@ public void disconnect(String nodeId) {
 log.info("Client requested disconnect from node {}", nodeId);
 selector.close(nodeId);
 long now = time.milliseconds();
-cancelInFlightRequests(nodeId, now, abortedSends);
+cancelInFlightRequests(nodeId, now, abortedSends, false);
 connectionStates.disconnected(nodeId, now);
 }
 
-private void cancelInFlightRequests(String nodeId, long now, 
Collection responses) {
+private void cancelInFlightRequests(String nodeId,

Review Comment:
   I didn't want to introduce any inconsistency in this change as the rest of 
the code doesn't follow that pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127197623


##
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##
@@ -42,19 +45,39 @@ public static OptionalLong requestTimeoutMsToDeadlineNs(
 private final OptionalLong deadlineNs;
 private final RequestHeaderData requestHeader;
 
+private final Optional> requestedPartitionCountRecorder;
+

Review Comment:
   Yes, this is messy. I have a lot of ideas for improving this, but most of 
them would require substantial refactoring.
   
   For now, let's just get rid of the obvious bad stuff.
   * Using a double for number of partitions is a bit silly
   * "Recorder" is also a bit of a misnomer -- we are doing more than recording 
the number of partitions.
   * The Optional isn't needed -- just pass a Consumer that does nothing if you 
don't want to do anything.
   
   So maybe change it to something like `Consumer 
applyPartitionChangeQuota` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127191979


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -378,6 +378,118 @@ object TestUtils extends Logging {
 props
   }
 
+  def createCombinedControllerConfig(nodeId: Int,
+ enableControlledShutdown: Boolean = true,
+ enableDeleteTopic: Boolean = true,
+ port: Int = RandomPort,
+ interBrokerSecurityProtocol: 
Option[SecurityProtocol] = None,
+ trustStoreFile: Option[File] = None,
+ saslProperties: Option[Properties] = None,
+ enablePlaintext: Boolean = true,
+ enableSaslPlaintext: Boolean = false,
+ saslPlaintextPort: Int = RandomPort,
+ enableSsl: Boolean = false,
+ sslPort: Int = RandomPort,
+ enableSaslSsl: Boolean = false,
+ saslSslPort: Int = RandomPort,
+ rack: Option[String] = None,
+ logDirCount: Int = 1,
+ enableToken: Boolean = false,
+ numPartitions: Int = 1,
+ defaultReplicationFactor: Short = 1,
+ enableFetchFromFollower: Boolean = 
false): Properties = {
+val retval = createBrokerConfig(nodeId,
+  zkConnect = null,
+  enableControlledShutdown,
+  enableDeleteTopic,
+  port,
+  interBrokerSecurityProtocol,
+  trustStoreFile,
+  saslProperties,
+  enablePlaintext,
+  enableSaslPlaintext,
+  saslPlaintextPort,
+  enableSsl,
+  sslPort,
+  enableSaslSsl,
+  saslSslPort,
+  rack,
+  logDirCount,
+  enableToken,
+  numPartitions,
+  defaultReplicationFactor,
+  enableFetchFromFollower,
+)
+retval.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+retval.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+retval.put(KafkaConfig.ListenersProp, 
s"${retval.get(KafkaConfig.ListenersProp)},CONTROLLER://localhost:0")
+retval.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0")
+retval
+  }
+
+  def createIsolatedControllerConfig(nodeId: Int,

Review Comment:
   Same question as above, why not just a boolean on `createBrokerConfig`?
   
   `combinedMode: Boolean = false` or similar



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127191300


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -378,6 +378,118 @@ object TestUtils extends Logging {
 props
   }
 
+  def createCombinedControllerConfig(nodeId: Int,

Review Comment:
   Can't we just add a boolean to `TestUtils.createBrokerConfig`? Two whole new 
functions seems like overkill.
   
   Also, while you're changing this code, it would be good to use the standard 
indentation style. Having the parameter list be indented halfway through the 
page really hurts the eyes. Thsee days we do:
   
   ```
   def myVeryLongFunctionName(
 foo: Foo,
 bar: Bar,
   ...
 quux: Quux
   ): ReturnValueType = {
 ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127191300


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -378,6 +378,118 @@ object TestUtils extends Logging {
 props
   }
 
+  def createCombinedControllerConfig(nodeId: Int,

Review Comment:
   Can't we just add a boolean to `TestUtils.createBrokerConfig`? Two whole new 
functions seems like overkill.
   
   Also, while you're changing this code, it would be good to use the standard 
indentation style. Having this be indented halfway through the page really 
hurts the eyes. Thsee days we do:
   
   ```
   def myVeryLongFunctionName(
 foo: Foo,
 bar: Bar,
   ...
 quux: Quux
   ): ReturnValueType = {
 ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


guozhangwang commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127185285


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -213,7 +215,7 @@ public UnsentRequest(
 Objects.requireNonNull(requestBuilder);
 this.requestBuilder = requestBuilder;
 this.node = node;
-this.callback = new FutureCompletionHandler();

Review Comment:
   nit: just call the member field `handler` as well?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-private Map properties;
-private SubscriptionState subscriptionState;
-private MockTime time;
-private LogContext logContext;
-private Metrics metrics;
-private ClusterResourceListeners clusterResourceListeners;
-private Optional groupId;
-private String clientId;
-private EventHandler eventHandler;
+
+private Consumer consumer;
+private Map consumerProps = new HashMap<>();
+
+private final Time time = new MockTime();
 
 @BeforeEach
 public void setup() {
-this.subscriptionState = Mockito.mock(SubscriptionState.class);
-this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-this.logContext = new LogContext();
-this.time = new MockTime();
-this.metrics = new Metrics(time);
-this.groupId = Optional.empty();
-this.clientId = "client-1";
-this.clusterResourceListeners = new ClusterResourceListeners();
-this.properties = new HashMap<>();
-this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-"localhost" +
-":");
-this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(CLIENT_ID_CONFIG, "test-client");
+injectConsumerConfigs();
+}
+
+@AfterEach
+public void cleanup() {
+if (consumer != null) {
+consumer.close(Duration.ZERO);
+}
 }
+
 @Test
-public void testSubscription() {
-this.subscriptionState =
-new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
-subscriptionState.subscribe(singleton("t1"),
-new NoOpConsumerRebalanceListener());
-assertEquals(1, consumer.subscription().size());
+public void testBackgroundThreadRunning() {
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());

Review Comment:
   +1



##
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127186242


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -479,6 +489,42 @@ class DynamicBrokerConfigTest {
 assertEquals("User:admin", authorizer.superUsers)
   }
 
+  @Test
+  def testCombinedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createCombinedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))

Review Comment:
   I sympathize with not wanting to debug the mocking mess here. There are just 
so many functions that get called, and having to do thenReturn for all of them 
is a huge pain. Still, just putting an `assertThrows(classOf[Throwable], ...)` 
seems like it could hide errors over time.
   
   Should we just add the authorizer as a reconfigurable directly? Or is that 
too hard to do here?



##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -479,6 +489,42 @@ class DynamicBrokerConfigTest {
 assertEquals("User:admin", authorizer.superUsers)
   }
 
+  @Test
+  def testCombinedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createCombinedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))
+props.put("super.users", "User:admin")
+controllerServer.config.dynamicConfig.updateBrokerConfig(0, props)
+assertEquals("User:admin", authorizer.superUsers)
+  }
+
+  @Test
+  def testIsolatedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createIsolatedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))

Review Comment:
   Same question as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127186242


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -479,6 +489,42 @@ class DynamicBrokerConfigTest {
 assertEquals("User:admin", authorizer.superUsers)
   }
 
+  @Test
+  def testCombinedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createCombinedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))

Review Comment:
   I sympathize with not wanting to debug the mocking mess here. There are just 
so many functions that get called, and having to do thenReturn for all of them 
is a huge pain. Still, this seems like it could hide errors over time.
   
   Should we just add the authorizer as a reconfigurable directly? Or is that 
too hard to do here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127182652


##
core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala:
##
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DynamicClientQuotaPublisher(
+  conf: KafkaConfig,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  clientQuotaMetadataManager: ClientQuotaMetadataManager,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {

Review Comment:
   I didn't think about this previously, but it makes sense to include 
`controller` or `broker` in the name that is returned here. Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14778) Kafka Streams 2.7.1 to 3.3.1 rolling upgrade with static membership triggers a rebalance

2023-03-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697181#comment-17697181
 ] 

Matthias J. Sax commented on KAFKA-14778:
-

Thanks for reporting this. – I believe the issue is as follows:
 * on restart, the consumer sends join group request (with laters subscription 
version)
 * broker returns buffered assignment without a rebalance (as it should)
 * KS inspects assignment and observes lower subscription – now things go 
"wrong"
 ** KS thinks that the assignment is invalid (because the leader should have 
sent an empty assignment on version probing only encoding it's version number)
 ** KS triggers a new rebalance sending it's subscription with lower 
subscription-version (as it assumes the leader could not decode the first 
subscription it sent)

It seem the fix might be to check if static membership is enabled and if the 
received assignment is empty of not. If static membership is enabled and if the 
assignment is not empty, it's not necessary to trigger a new rebalance.

One drawback is that all KS instances would stay on the old assignment, thus it 
might actually be desirable to have a single rebalance that allows all 
instances to switch to the new subscription/assignment version. This single 
rebalance should only happen _after_ all instances got updated. However, it's 
unclear how we could trigger such an rebalance, and it's also an open question 
if this rebalance is really desired or not?

> Kafka Streams 2.7.1 to 3.3.1 rolling upgrade with static membership triggers 
> a rebalance
> 
>
> Key: KAFKA-14778
> URL: https://issues.apache.org/jira/browse/KAFKA-14778
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.1, 3.3.1
>Reporter: Vinoth Rengarajan
>Priority: Major
>
> Trying to upgrade Kaka Streams application from 2.7.1 to 3.3.1 with static 
> membership but it triggers a rebalance
> Brokers are running on Kafka 2.7.1. Enabled the static membership in the 
> application. Below are the configs {*}(Stream Config & Consumer Config){*}.
> Followed below steps to upgrade
>  * Brokers are running on Kafka 2.7.1(tried with 3.3.1 version then also 
> rebalance happens).
>  * Application is running with 2.7.1 Kafka streams libraries.
>  * Deployed the latest version of the application with 3.3.1 Kafka streams 
> libraries, and configured the *upgrade.from* property to 2.7 (based on the 
> upgrade documentation available here 
> [https://kafka.apache.org/33/documentation/streams/upgrade-guide]).
>  * Doing a rolling bounce with the latest changes, rebalance is being 
> triggered on other instances in the cluster.
> Below are logs on the instance which is being bounced, forcing a rebalance on 
> others. 
> *Logs:*
>  
> {code:java}
> INFO  2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client 
> [kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to 
> REBALANCING
> INFO  2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer 
> instanceId=kafka_upgrade.Kafka_Upgrade_Test-4, 
> clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer, 
> groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new 
> Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11, 
> kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
> stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Sent 
> a version 11 subscription and got version 8 assignment back (successful 
> version probing). Downgrade subscription metadata to commonly supported 
> version 8 and trigger new rebalance.
> INFO  2023-02-27 09:52:16.947 | 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127176224


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -730,6 +744,45 @@ class DynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
 }
   }
 
+  def getValue(config: KafkaConfig, name: String): Int = {
+name match {
+  case KafkaConfig.NumIoThreadsProp => config.numIoThreads
+  case KafkaConfig.NumReplicaFetchersProp => config.numReplicaFetchers
+  case KafkaConfig.NumRecoveryThreadsPerDataDirProp => 
config.numRecoveryThreadsPerDataDir
+  case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
+  case n => throw new IllegalStateException(s"Unexpected config $n")
+}
+  }
+}
+
+class ControllerDynamicThreadPool(controller: ControllerServer) extends 
BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+DynamicThreadPool.ReconfigurableConfigs // common configs

Review Comment:
   This is a small thing but, we probably shouldn't return 
`num.replica.fetchers`, `num.recovery.threads.per.data.dir`, and 
`background.threads` here, since they're not actually reconfigurable on the 
controller.
   
   We could still share most of the code here if we pass in the 
`ReconfigurableConfigs` `Set[String]` to 
`DynamicThreadPool.validateReconfiguration`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127176972


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -911,7 +954,22 @@ object DynamicListenerConfig {
   )
 }
 
-class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
+trait DynamicClientQuotaCallbackServer {

Review Comment:
   We don't really need a trait for this, do we? It's just two objects that we 
can pass directly to the constructor of `DynamicClientQuotaCallback`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127179616


##
core/src/test/java/kafka/test/MockController.java:
##
@@ -181,10 +194,16 @@ public CompletableFuture unregisterBroker(
 static class MockTopic {
 private final String name;
 private final Uuid id;
+private final int numPartitions;
 
 MockTopic(String name, Uuid id) {
+this(name, id, 10); // hard-code the number of partitions if left 
unspecified

Review Comment:
   I guess this is a nitpick but `num.partitions` defaults to 1, so having a 
default of 10 partitions is a bit surprising here. Is there a reason to use 10 
instead of 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127176972


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -911,7 +954,22 @@ object DynamicListenerConfig {
   )
 }
 
-class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
+trait DynamicClientQuotaCallbackServer {

Review Comment:
   We don't really need a trait for this, do we? It's just two objects that we 
can pass to the constructor of `DynamicClientQuotaCallback`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-06 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1127176224


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -730,6 +744,45 @@ class DynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
 }
   }
 
+  def getValue(config: KafkaConfig, name: String): Int = {
+name match {
+  case KafkaConfig.NumIoThreadsProp => config.numIoThreads
+  case KafkaConfig.NumReplicaFetchersProp => config.numReplicaFetchers
+  case KafkaConfig.NumRecoveryThreadsPerDataDirProp => 
config.numRecoveryThreadsPerDataDir
+  case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
+  case n => throw new IllegalStateException(s"Unexpected config $n")
+}
+  }
+}
+
+class ControllerDynamicThreadPool(controller: ControllerServer) extends 
BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+DynamicThreadPool.ReconfigurableConfigs // common configs

Review Comment:
   We probably shouldn't return `num.replica.fetchers`, 
`num.recovery.threads.per.data.dir`, and `background.threads` here, since 
they're not actually reconfigurable on the controller.
   
   I don't think we have to fix that now, but I will file a JIRA to clean this 
up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14722) Make BooleanSerde public

2023-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14722:

Description: 
KIP-907: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]

 

We introduce a "BooleanSerde" via [https://github.com/apache/kafka/pull/13249] 
as internal class. We could make it public.

  was:We introduce a "BooleanSerde" via 
[https://github.com/apache/kafka/pull/13249] as internal class. We could make 
it public.


> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, kip, newbie
>
> KIP-907: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]
>  
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14722) Make BooleanSerde public

2023-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14722:

Labels: beginner kip newbie  (was: beginner need-kip newbie)

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, kip, newbie
>
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12446) Define KGroupedTable#aggregate subtractor + adder order of execution

2023-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12446:

Description: 
KIP-904: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed]
 

 

Currently, when an update is processed by KGroupedTable#aggregate, the 
subtractor is called first, then the adder. But per the docs the order of 
execution is not defined (ie. could change in future releases).

[https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating]
{quote}When subsequent non-null values are received for a key (e.g., UPDATE), 
then (1) the subtractor is called with the old value as stored in the table and 
(2) the adder is called with the new value of the input record that was just 
received. The order of execution for the subtractor and adder is not defined.
{quote}
This ticket proposes making the current order of execution part of the public 
contract.

That would allow Kafka Streams DSL users the freedom to use aggregates such as: 
{code:java}
aggregate(
  HashMap::new,
  (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), 
newValue.getValue() }, // adder
  (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // 
subtractor
){code}
and handle updates where key remains the same but value changes.

The Kafka Music Example at

[https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345]

relies on the subtractor being called first.

 

See discussion at 
[https://github.com/confluentinc/kafka-streams-examples/issues/380]

See also the more general point made at 
[https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined]
 
{quote}If the adder and subtractor are non-commutative operations and the order 
in which they are executed can vary, you can end up with different results 
depending on the order of execution of adder and subtractor. An example of a 
useful non-commutative operation would be something like if we’re aggregating 
records into a Set:{color:#172b4d} {color}
{quote}
{code:java}
.aggregate[Set[Animal]](Set.empty)(
 adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
 subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)
{code}
{quote}In this example, for duplicated events, if the adder is called before 
the subtractor you would end up removing the value entirely from the set (which 
would be problematic for most use-cases I imagine).
{quote}
As [~mjsax] notes on 
[https://github.com/confluentinc/kafka-streams-examples/issues/380]

 
{quote}the implementation used the same order since 0.10.0 release and it was 
never changed
{quote}
so making this behavior part of the standard amounts to making official what 
has already been stable for a long time.

Cost:
 *  Limits your options for the future. If you ever needed Kafka Streams to 
change the order of execution (or make that order indeterminate instead of its 
current hard coded order), you would have to make that a breaking change.

Benefit:
 * Encourages wider use of the KGroupedTable#aggregate method (current lack of 
a defined order prevents using aggregate with non-commutative adder/subtractor 
functions)
 * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing that 
a given order can be relied upon makes the method itself easier to understand)

 

 

 

  was:
Currently, when an update is processed by KGroupedTable#aggregate, the 
subtractor is called first, then the adder. But per the docs the order of 
execution is not defined (ie. could change in future releases).

[https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating]
{quote}When subsequent non-null values are received for a key (e.g., UPDATE), 
then (1) the subtractor is called with the old value as stored in the table and 
(2) the adder is called with the new value of the input record that was just 
received. The order of execution for the subtractor and adder is not defined.
{quote}
This ticket proposes making the current order of execution part of the public 
contract.

That would allow Kafka Streams DSL users the freedom to use aggregates such as: 
{code:java}
aggregate(
  HashMap::new,
  (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), 
newValue.getValue() }, // adder
  (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // 
subtractor
){code}
and handle updates where key remains the same but value changes.

The Kafka Music Example at


[jira] [Updated] (KAFKA-12446) Define KGroupedTable#aggregate subtractor + adder order of execution

2023-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12446:

Labels: kip  (was: )

> Define KGroupedTable#aggregate subtractor + adder order of execution
> 
>
> Key: KAFKA-12446
> URL: https://issues.apache.org/jira/browse/KAFKA-12446
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ben Ellis
>Assignee: Ben Ellis
>Priority: Minor
>  Labels: kip
>
> Currently, when an update is processed by KGroupedTable#aggregate, the 
> subtractor is called first, then the adder. But per the docs the order of 
> execution is not defined (ie. could change in future releases).
> [https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating]
> {quote}When subsequent non-null values are received for a key (e.g., UPDATE), 
> then (1) the subtractor is called with the old value as stored in the table 
> and (2) the adder is called with the new value of the input record that was 
> just received. The order of execution for the subtractor and adder is not 
> defined.
> {quote}
> This ticket proposes making the current order of execution part of the public 
> contract.
> That would allow Kafka Streams DSL users the freedom to use aggregates such 
> as: 
> {code:java}
> aggregate(
>   HashMap::new,
>   (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), 
> newValue.getValue() }, // adder
>   (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // 
> subtractor
> ){code}
> and handle updates where key remains the same but value changes.
> The Kafka Music Example at
> [https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345]
> relies on the subtractor being called first.
>  
> See discussion at 
> [https://github.com/confluentinc/kafka-streams-examples/issues/380]
> See also the more general point made at 
> [https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined]
>  
> {quote}If the adder and subtractor are non-commutative operations and the 
> order in which they are executed can vary, you can end up with different 
> results depending on the order of execution of adder and subtractor. An 
> example of a useful non-commutative operation would be something like if 
> we’re aggregating records into a Set:{color:#172b4d} {color}
> {quote}
> {code:java}
> .aggregate[Set[Animal]](Set.empty)(
>  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
>  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - 
> animalValue
> )
> {code}
> {quote}In this example, for duplicated events, if the adder is called before 
> the subtractor you would end up removing the value entirely from the set 
> (which would be problematic for most use-cases I imagine).
> {quote}
> As [~mjsax] notes on 
> [https://github.com/confluentinc/kafka-streams-examples/issues/380]
>  
> {quote}the implementation used the same order since 0.10.0 release and it was 
> never changed
> {quote}
> so making this behavior part of the standard amounts to making official what 
> has already been stable for a long time.
> Cost:
>  *  Limits your options for the future. If you ever needed Kafka Streams to 
> change the order of execution (or make that order indeterminate instead of 
> its current hard coded order), you would have to make that a breaking change.
> Benefit:
>  * Encourages wider use of the KGroupedTable#aggregate method (current lack 
> of a defined order prevents using aggregate with non-commutative 
> adder/subtractor functions)
>  * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing 
> that a given order can be relied upon makes the method itself easier to 
> understand)
>  
>  
> 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2023-03-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697175#comment-17697175
 ] 

ASF GitHub Bot commented on KAFKA-13882:


mjsax commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1457209098

   Ah sorry. Seems I miss-understood your comment @mimaison. 

> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement

2023-03-06 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697167#comment-17697167
 ] 

Guozhang Wang commented on KAFKA-14748:
---

Cool! We are on the same page for the first question then.

As for the second question: I think you're right, people can still get the old 
behavior if they filter out the extracted null key records beforehand.

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14371) quorum-state file contains empty/unused clusterId field

2023-03-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-14371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697165#comment-17697165
 ] 

José Armando García Sancio commented on KAFKA-14371:


Even though the KRaft logic doesn't read this value, it is read by the 
deserialization code. We need to come up with a strategy for allowing the user 
to downgrade after upgrading to 3.5 assuming this is implemented in 3.5

> quorum-state file contains empty/unused clusterId field
> ---
>
> Key: KAFKA-14371
> URL: https://issues.apache.org/jira/browse/KAFKA-14371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Assignee: Gantigmaa Selenge
>Priority: Minor
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> The KRaft controller's quorum-state file 
> `$LOG_DIR/__cluster_metadata-0/quorum-state` contains an empty clusterId 
> value.  This value is never non-empty, and it is never used after it is 
> written and then subsequently read.  This is a cosmetic issue; it would be 
> best if this value did not exist there.  The cluster ID already exists in the 
> `$LOG_DIR/meta.properties` file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14371) quorum-state file contains empty/unused clusterId field

2023-03-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14371:
---
Labels: needs-kip  (was: )

> quorum-state file contains empty/unused clusterId field
> ---
>
> Key: KAFKA-14371
> URL: https://issues.apache.org/jira/browse/KAFKA-14371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Assignee: Gantigmaa Selenge
>Priority: Minor
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> The KRaft controller's quorum-state file 
> `$LOG_DIR/__cluster_metadata-0/quorum-state` contains an empty clusterId 
> value.  This value is never non-empty, and it is never used after it is 
> written and then subsequently read.  This is a cosmetic issue; it would be 
> best if this value did not exist there.  The cluster ID already exists in the 
> `$LOG_DIR/meta.properties` file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on pull request #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


jsancio commented on PR #13355:
URL: https://github.com/apache/kafka/pull/13355#issuecomment-1457165155

   This PR reverts https://github.com/apache/kafka/pull/13102
   
   Reason for the revert is documented here: 
https://github.com/apache/kafka/pull/13102#issuecomment-1457138403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13355: Revert "KAFKA-14371: Remove unused clusterId field from quorum-state file"

2023-03-06 Thread via GitHub


jsancio opened a new pull request, #13355:
URL: https://github.com/apache/kafka/pull/13355

   This reverts commit 0927049a617fa2937a211aab895f6590403130fb.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-03-06 Thread via GitHub


jsancio commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1457138403

   I was thinking about this change today and realized that this change is not 
backwards compatible. For example, the user upgrades the cluster and then they 
downgrade the cluster, it will stop working. For example, I got the following 
error if the `clusterId` is removed:
   
   ```
   [2023-03-06 14:31:09,673] ERROR [SharedServer id=1] Got exception while 
starting SharedServer (kafka.server.SharedServer)
   java.lang.RuntimeException: QuorumStateData: unable to locate field 
'clusterId', which is mandatory in version 1
   at 
org.apache.kafka.raft.generated.QuorumStateDataJsonConverter.read(QuorumStateDataJsonConverter.java:40)
   at 
org.apache.kafka.raft.FileBasedStateStore.readStateFromFile(FileBasedStateStore.java:94)
   at 
org.apache.kafka.raft.FileBasedStateStore.readElectionState(FileBasedStateStore.java:110)
   at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:116)
   at 
org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:387)
   at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:256)
   at kafka.raft.KafkaRaftManager.(RaftManager.scala:179)
   at kafka.server.SharedServer.start(SharedServer.scala:245)
   at 
kafka.server.SharedServer.startForController(SharedServer.scala:137)
   at kafka.server.ControllerServer.startup(ControllerServer.scala:332)
   at 
kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:134)
   at 
kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:134)
   at scala.Option.foreach(Option.scala:437)
   at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:134)
   at kafka.Kafka$.main(Kafka.scala:114)
   at kafka.Kafka.main(Kafka.scala)
   [2023-03-06 14:31:09,675] INFO Waiting for controller quorum voters future 
(kafka.server.ControllerServer)
   [2023-03-06 14:31:09,676] INFO Finished waiting for controller quorum voters 
future (kafka.server.ControllerServer)
   [2023-03-06 14:31:09,677] ERROR [ControllerServer id=1] Fatal error during 
controller startup. Prepare to shutdown (kafka.server.ControllerServer)
   java.lang.NullPointerException: Cannot invoke 
"kafka.raft.KafkaRaftManager.apiVersions()" because the return value of 
"kafka.server.SharedServer.raftManager()" is null
   at kafka.server.ControllerServer.startup(ControllerServer.scala:349)
   at 
kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:134)
   at 
kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:134)
   at scala.Option.foreach(Option.scala:437)
   at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:134)
   at kafka.Kafka$.main(Kafka.scala:114)
   at kafka.Kafka.main(Kafka.scala)
   [2023-03-06 14:31:09,678] INFO [ControllerServer id=1] shutting down 
(kafka.server.ControllerServer)
   [2023-03-06 14:31:09,680] INFO [SocketServer listenerType=CONTROLLER, 
nodeId=1] Stopping socket server request processors (kafka.network.SocketServer)
   [2023-03-06 14:31:09,683] INFO [SocketServer listenerType=CONTROLLER, 
nodeId=1] Stopped socket server request processors (kafka.network.SocketServer)
   [2023-03-06 14:31:09,684] INFO [SocketServer listenerType=CONTROLLER, 
nodeId=1] Shutting down socket server (kafka.network.SocketServer)
   [2023-03-06 14:31:09,698] INFO [SocketServer listenerType=CONTROLLER, 
nodeId=1] Shutdown completed (kafka.network.SocketServer)
   [2023-03-06 14:31:09,699] ERROR Exiting Kafka due to fatal exception during 
startup. (kafka.Kafka$)
   java.lang.NullPointerException: Cannot invoke 
"kafka.raft.KafkaRaftManager.apiVersions()" because the return value of 
"kafka.server.SharedServer.raftManager()" is null
   at kafka.server.ControllerServer.startup(ControllerServer.scala:349)
   at 
kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:134)
   at 
kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:134)
   at scala.Option.foreach(Option.scala:437)
   at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:134)
   at kafka.Kafka$.main(Kafka.scala:114)
   at kafka.Kafka.main(Kafka.scala)
   ```
   
   I am going to revert this change.
   
   To make a similar change in the future we need a KIP since it requires 
changes to the on-disk format of a file.
   
   cc @tinaselenge @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2023-03-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697160#comment-17697160
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1457134844

   Hi @mimaison , another way I can think of is to avoid using .htaccess in the 
docker, and inject all config required in httpd.conf instead.
   
   
https://github.com/apache/kafka-site/pull/410/commits/4d7397f17d10e64d3fadf4e8722e492bdf131fba
   
   I am not sure if this is better since we now have duplicated config
   
   I also wonder if .htaccess is actually used in the actual deployment, as it 
requires modifying httpd.conf




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14786) Implement connector offset write/reset internal logic

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14786:
-

 Summary: Implement connector offset write/reset internal logic
 Key: KAFKA-14786
 URL: https://issues.apache.org/jira/browse/KAFKA-14786
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Chris Egerton


Implement the internal logic necessary for altering/resetting the offsets of 
connectors, [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Endpointsbehavior].

This should not include any changes to public interface except the introduction 
of the new {{SourceConnector::alterOffsets}} and 
{{SinkConnector::alterOffsets}} methods (i.e., it should not expose or test any 
new REST endpoints).

Ideally, we'll separate this from KAFKA-14368, KAFKA-14784, and KAFKA-14785 by 
making all changes here target the internal Connect {{Herder}} interface, and 
have the changes for the other three rely on those new {{Herder}} methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14784) Implement connector offset reset REST API

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14784:
-

 Summary: Implement connector offset reset REST API
 Key: KAFKA-14784
 URL: https://issues.apache.org/jira/browse/KAFKA-14784
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Chris Egerton


Implement the {{DELETE /connectors/name/offsets}} endpoint [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Resettingoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14785) Implement connector offset read REST API

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14785:
-

 Summary: Implement connector offset read REST API
 Key: KAFKA-14785
 URL: https://issues.apache.org/jira/browse/KAFKA-14785
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Chris Egerton


Implement the {{GET /connector/name/offsets}} endpoint [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Readingoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14368) Implement connector offset write REST API

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14368:
--
Component/s: KafkaConnect

> Implement connector offset write REST API
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the {{PATCH /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14368) Implement connector offset write REST API

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14368:
--
Summary: Implement connector offset write REST API  (was: Add an offset 
write REST API to Kafka Connect)

> Implement connector offset write REST API
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the {{PATCH /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14368) Add an offset write REST API to Kafka Connect

2023-03-06 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697157#comment-17697157
 ] 

Chris Egerton commented on KAFKA-14368:
---

[~yash.mayya] Given the updates to KIP-875, I've converted this into a subtask 
that encapsulates the endpoint for manually writing new offsets for a 
connector. Are you still interested in working on this, or should I unassign 
the ticket (and possibly begin working on it depending on time and interest 
from others)?

> Add an offset write REST API to Kafka Connect
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>  Labels: kip-required
>
> Implement the {{PATCH /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14368) Add an offset write REST API to Kafka Connect

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14368:
--
Labels:   (was: kip-required)

> Add an offset write REST API to Kafka Connect
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the {{PATCH /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14368) Add an offset write REST API to Kafka Connect

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14368:
--
Description: Implement the {{PATCH /connectors/name/offsets}} endpoint 
[described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].
  (was: 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 for https://issues.apache.org/jira/browse/KAFKA-4107 proposes to add an offset 
reset API which will allow resetting the offsets for source and sink connectors 
so that they can consume from the beginning of the stream. However, an offset 
API to write arbitrary offsets would also be useful for certain connectors in 
order to go back in time but not to the beginning, or to skip some problematic 
record and move forward. Based on the discussion thread for KIP-875 
[here|https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02], it was 
determined that this could be done through a follow-up KIP if/when KIP-875 is 
adopted.)

> Add an offset write REST API to Kafka Connect
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>  Labels: kip-required
>
> Implement the {{PATCH /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14783) Implement new STOPPED state for connectors

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14783:
-

 Summary: Implement new STOPPED state for connectors
 Key: KAFKA-14783
 URL: https://issues.apache.org/jira/browse/KAFKA-14783
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


Implement the {{STOPPED}} state [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14783) Implement new STOPPED state for connectors

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14783:
--
Parent: KAFKA-4107
Issue Type: Sub-task  (was: Task)

> Implement new STOPPED state for connectors
> --
>
> Key: KAFKA-14783
> URL: https://issues.apache.org/jira/browse/KAFKA-14783
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Implement the {{STOPPED}} state [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14368) Add an offset write REST API to Kafka Connect

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14368:
--
Parent: KAFKA-4107
Issue Type: Sub-task  (was: New Feature)

> Add an offset write REST API to Kafka Connect
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>  Labels: kip-required
>
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  for https://issues.apache.org/jira/browse/KAFKA-4107 proposes to add an 
> offset reset API which will allow resetting the offsets for source and sink 
> connectors so that they can consume from the beginning of the stream. 
> However, an offset API to write arbitrary offsets would also be useful for 
> certain connectors in order to go back in time but not to the beginning, or 
> to skip some problematic record and move forward. Based on the discussion 
> thread for KIP-875 
> [here|https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02], it 
> was determined that this could be done through a follow-up KIP if/when 
> KIP-875 is adopted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee opened a new pull request, #13354: KAFKA-14753: Improve kafka producer example

2023-03-06 Thread via GitHub


philipnee opened a new pull request, #13354:
URL: https://github.com/apache/kafka/pull/13354

   The current kafka producer example needs a bit of improvement, in particular:
   1. documentation is lacking
   2. it is not closing correctly
   3. Code can be cleaner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13353: KAFKA-14752: Improving the existing consumer examples

2023-03-06 Thread via GitHub


philipnee commented on PR #13353:
URL: https://github.com/apache/kafka/pull/13353#issuecomment-1457033568

   This is the snippet of the log:
   ```
   Subscribe to:topic1
   ...
   Assigning partitions:[topic1-0]
   ...
   DemoConsumer finished reading 1 messages
   Revoking partitions:[topic1-0]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13353: KAFKA-14752: Improving the existing consumer examples

2023-03-06 Thread via GitHub


philipnee opened a new pull request, #13353:
URL: https://github.com/apache/kafka/pull/13353

   The changes include:
   1. Documentation changes
   2. Closing mechanism
   3. Adding a rebalance listener (to print partition changes when rebalance is 
happening)
   4. Fixing a few errors
   cc @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1457022169

   Thanks for reviewing this, @rajinisivaram - fixes are on the way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127046532


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -83,12 +92,23 @@
 private final Metrics metrics;
 private final long defaultApiTimeoutMs;
 
+public PrototypeAsyncConsumer(Properties properties,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
+}
+
+public PrototypeAsyncConsumer(final Map configs,
+  final Deserializer keyDeser,
+  final Deserializer valDeser) {
+this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, 
valDeser)), keyDeser,
+valDeser);
+}
 @SuppressWarnings("unchecked")
-public PrototypeAsyncConsumer(final Time time,
-  final ConsumerConfig config,
+public PrototypeAsyncConsumer(final ConsumerConfig config,
   final Deserializer keyDeserializer,
   final Deserializer valueDeserializer) {
-this.time = time;
+this.time = Time.SYSTEM;

Review Comment:
   Thanks for spotting this. I think the unit test consumer is meant to be the 
one below this. I think I should remove the time and make the constructor param 
list the same as the current one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127043244


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
 @Override
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
-final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(offsets);
-commitEvent.future().whenComplete((r, t) -> {
-callback.onComplete(offsets, new RuntimeException(t));
+CompletableFuture future = commit(offsets);
+future.whenComplete((r, t) -> {
+if (t != null) {
+callback.onComplete(offsets, new RuntimeException(t));

Review Comment:
   You are right, it should've been a KafkaException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-06 Thread via GitHub


rajinisivaram commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1126983650


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -83,12 +92,23 @@
 private final Metrics metrics;
 private final long defaultApiTimeoutMs;
 
+public PrototypeAsyncConsumer(Properties properties,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
+}
+
+public PrototypeAsyncConsumer(final Map configs,
+  final Deserializer keyDeser,
+  final Deserializer valDeser) {
+this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, 
valDeser)), keyDeser,
+valDeser);
+}
 @SuppressWarnings("unchecked")
-public PrototypeAsyncConsumer(final Time time,
-  final ConsumerConfig config,
+public PrototypeAsyncConsumer(final ConsumerConfig config,
   final Deserializer keyDeserializer,
   final Deserializer valueDeserializer) {
-this.time = time;
+this.time = Time.SYSTEM;

Review Comment:
   Why did we remove `time`? Could be useful for unit testing?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-private Map properties;
-private SubscriptionState subscriptionState;
-private MockTime time;
-private LogContext logContext;
-private Metrics metrics;
-private ClusterResourceListeners clusterResourceListeners;
-private Optional groupId;
-private String clientId;
-private EventHandler eventHandler;
+
+private Consumer consumer;
+private Map consumerProps = new HashMap<>();
+
+private final Time time = new MockTime();
 
 @BeforeEach
 public void setup() {
-this.subscriptionState = Mockito.mock(SubscriptionState.class);
-this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-this.logContext = new LogContext();
-this.time = new MockTime();
-this.metrics = new Metrics(time);
-this.groupId = Optional.empty();
-this.clientId = "client-1";
-this.clusterResourceListeners = new ClusterResourceListeners();
-this.properties = new HashMap<>();
-this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-"localhost" +
-":");
-this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(CLIENT_ID_CONFIG, "test-client");
+injectConsumerConfigs();
+}
+
+@AfterEach
+public void cleanup() {
+if (consumer != null) {
+consumer.close(Duration.ZERO);
+}
 }
+
 @Test
-public void testSubscription() {
-this.subscriptionState =
-new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
-

[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-06 Thread via GitHub


jolshan commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1126944890


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -337,8 +362,27 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 .setResponses(topicResponseList));
 }
 
+public String clusterId() {
+return data.clusterId();
+}
+
+public List topics() {
+return data.topics();
+}
+
+public int maxWaitMs() {
+return data.maxWaitMs();
+}

Review Comment:
   The data object is public, so we may just be able to use that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13352: Add support of topic ids for the OffsetFetch API from version 9.

2023-03-06 Thread via GitHub


jolshan commented on PR #13352:
URL: https://github.com/apache/kafka/pull/13352#issuecomment-1456837604

   Let me know if you have any topic ID questions @Hangleton 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-06 Thread via GitHub


jolshan commented on code in PR #13329:
URL: https://github.com/apache/kafka/pull/13329#discussion_r1126927374


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -62,6 +63,15 @@ private[group] class GroupCoordinatorAdapter(
   private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
 
+  override def consumerGroupHeartbeat(
+context: RequestContext,
+request: ConsumerGroupHeartbeatRequestData
+  ): CompletableFuture[ConsumerGroupHeartbeatResponseData] = {
+FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+  s"The old group coordinator does not support 
${ApiKeys.CONSUMER_GROUP_HEARTBEAT.name} API."

Review Comment:
   Ah ok. I see. Makes sense and thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13331: MINOR: fix fault handling in ControllerServer and KafkaServer

2023-03-06 Thread via GitHub


cmccabe merged PR #13331:
URL: https://github.com/apache/kafka/pull/13331


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-06 Thread via GitHub


dajac commented on code in PR #13329:
URL: https://github.com/apache/kafka/pull/13329#discussion_r1126922151


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -62,6 +63,15 @@ private[group] class GroupCoordinatorAdapter(
   private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
 
+  override def consumerGroupHeartbeat(
+context: RequestContext,
+request: ConsumerGroupHeartbeatRequestData
+  ): CompletableFuture[ConsumerGroupHeartbeatResponseData] = {
+FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+  s"The old group coordinator does not support 
${ApiKeys.CONSUMER_GROUP_HEARTBEAT.name} API."

Review Comment:
   yeah... this code will never but hit if we keep the check in KafkaApis. i 
had to put something here just in case so it is better to actually put a 
meaningful message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-06 Thread via GitHub


jolshan commented on code in PR #13329:
URL: https://github.com/apache/kafka/pull/13329#discussion_r1126921525


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,6 +54,20 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+  ))
+  def 
testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit 
= {
+val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+).build()
+
+val consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+val expectedResponse = new 
ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)

Review Comment:
   Did we want to test multiple configurations when we do make the change? Ie, 
get unsupported for old group coordinator and none when using the new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-06 Thread via GitHub


dajac commented on code in PR #13329:
URL: https://github.com/apache/kafka/pull/13329#discussion_r1126920827


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3573,9 +3573,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleConsumerGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val consumerGroupHeartbeatRequest = 
request.body[ConsumerGroupHeartbeatRequest]
-// KIP-848 is not implemented yet so return UNSUPPORTED_VERSION.
-requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the

Review Comment:
   good question. i think that we will likely bind the new coordinator to an 
IBP version.



##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,6 +54,20 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+  ))
+  def 
testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit 
= {
+val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+).build()
+
+val consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+val expectedResponse = new 
ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)

Review Comment:
   that's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-06 Thread via GitHub


jolshan commented on code in PR #13329:
URL: https://github.com/apache/kafka/pull/13329#discussion_r1126919357


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -62,6 +63,15 @@ private[group] class GroupCoordinatorAdapter(
   private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
 
+  override def consumerGroupHeartbeat(
+context: RequestContext,
+request: ConsumerGroupHeartbeatRequestData
+  ): CompletableFuture[ConsumerGroupHeartbeatResponseData] = {
+FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+  s"The old group coordinator does not support 
${ApiKeys.CONSUMER_GROUP_HEARTBEAT.name} API."

Review Comment:
   Does this error message make sense? I thought we already failed for the "old 
group coordinator" in KafkaApis, so wouldn't this run only if we were using the 
new coordinator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2023-03-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697103#comment-17697103
 ] 

ASF GitHub Bot commented on KAFKA-13882:


qingwei91 commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1456760908

   @mimaison , really sorry, I forgotten to check quick start. I've added it 
back now, guess I need to figure out why that's required here but not in the 
actual deployment ...




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton opened a new pull request, #13352: Add support of topic ids for the OffsetFetch API from version 9.

2023-03-06 Thread via GitHub


Hangleton opened a new pull request, #13352:
URL: https://github.com/apache/kafka/pull/13352

   WIP.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13331: MINOR: fix fault handling in ControllerServer and KafkaServer

2023-03-06 Thread via GitHub


rondagostino commented on code in PR #13331:
URL: https://github.com/apache/kafka/pull/13331#discussion_r1126859142


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -170,6 +170,17 @@ class SharedServer(
   snapshotsDiabledReason.compareAndSet(null, "metadata loading fault")
 })
 
+  /**
+   * The fault handler to use when ControllerServer.startup throws an 
exception.
+   */
+  def controllerStartupFault: FaultHandler = faultHandlerFactory.build(

Review Comment:
   nit: `controllerStartupFaultHandler` to match the style of the other ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-03-06 Thread via GitHub


mimaison commented on PR #13157:
URL: https://github.com/apache/kafka/pull/13157#issuecomment-1456709095

   Sorry for the delay. I _think_ you technically don't necessarily need to 
have transitive dependencies in the classpath to compile a class. For example, 
it is possible to build a custom `TopicFilter` without having `mirror-client` 
in the classpath. For that reason, in theory, this change could break some 
users.
   
   So can you try to keep the classes in mirror, otherwise we'll need a small 
KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2023-03-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697070#comment-17697070
 ] 

ASF GitHub Bot commented on KAFKA-13882:


mimaison commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1456641170

   @mjsax My point is that when testing locally without the `.htaccess` 
changes, all pages other than localhost:8080/documentation/ fail to load. For 
example http://localhost:8080/quickstart returns a 404.
   
   I see that @qingwei91 removed the `.htaccess` changes in the last commit and 
I confirmed the issue happens again. I'm a bit scared of touching the 
`.htaccess` file as I don't want to break the real website.




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13351: KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is configured on source broker

2023-03-06 Thread via GitHub


C0urante commented on PR #13351:
URL: https://github.com/apache/kafka/pull/13351#issuecomment-1456617781

   @mimaison would you mind taking a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #13351: KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is configured on source broker

2023-03-06 Thread via GitHub


C0urante opened a new pull request, #13351:
URL: https://github.com/apache/kafka/pull/13351

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14781)
   
   Instead of logging an `ERROR`-level message when topic syncing is enabled 
(which it is by default) and no ACL authorizer is configured on the source 
Kafka cluster, we first issue an `INFO`-level message that suggests to the user 
that they disable topic ACL syncing, and then for each subsequent time, issue a 
`DEBUG`-level message.
   
   Some thoughts while implementing this:
   
   1. The number of testing-only constructors for `MirrorSourceConnector` is 
getting a bit out of hand. We might consider replacing all of them with a 
package-private `start(MirrorSourceConfig)` method, which can help unify 
testing and non-testing code paths, as well as reduce the testing-only code 
that we have to tack on to non-testing code.
   2. We could possibly expand on the test case introduced here to simulate an 
ACL authorizer being added to the source Kafka cluster during the lifetime of 
the connector. I've omitted this as I don't believe it's a very common case and 
the changes here do not seem likely to introduce any risk for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13212: MINOR: Remove accidental unnecessary code; fix comment references

2023-03-06 Thread via GitHub


mimaison merged PR #13212:
URL: https://github.com/apache/kafka/pull/13212


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13215: KAFKA-14578: Move ConsumerPerformance to tools

2023-03-06 Thread via GitHub


mimaison merged PR #13215:
URL: https://github.com/apache/kafka/pull/13215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram opened a new pull request, #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-03-06 Thread via GitHub


rajinisivaram opened a new pull request, #13350:
URL: https://github.com/apache/kafka/pull/13350

   Best-effort rack alignment for sticky assignors when both consumer racks and 
partition racks are available with the protocol changes introduced in KIP-881. 
Rack-aware assignment is enabled by configuring client.rack for consumers. The 
assigment builders attempt to align on racks on a best-effort basis, but 
prioritize balanced assignment over rack alignment.
   
   Note that this PR is built on top of the refactoring in the PR 
https://github.com/apache/kafka/pull/13349.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14781) MM2 logs misleading error during topic ACL sync when broker does not have authorizer configured

2023-03-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14781:
-

Assignee: Chris Egerton

> MM2 logs misleading error during topic ACL sync when broker does not have 
> authorizer configured
> ---
>
> Key: KAFKA-14781
> URL: https://issues.apache.org/jira/browse/KAFKA-14781
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When there is no broker-side authorizer configured on a Kafka cluster 
> targeted by MirrorMaker 2, users see error-level log messages like this 
> one:{{{}{}}}
> {quote}[2023-03-06 10:53:57,488] ERROR [MirrorSourceConnector|worker] 
> Scheduler for MirrorSourceConnector caught exception in scheduled task: 
> syncing topic ACLs (org.apache.kafka.connect.mirror.Scheduler:102)
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is 
> configured on the broker
>     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:456)
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:342)
>     at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>     at 
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>     at 
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No 
> Authorizer is configured on the broker
> {quote}
> This can be misleading as it looks like something is wrong with MM2 or the 
> Kafka cluster. In reality, it's usually fine, since topic ACL syncing is 
> enabled by default and it's reasonable for Kafka clusters (especially in 
> testing/dev environments) to not have authorizers enabled.
> We should try to catch this specific case and downgrade the severity of the 
> log message from {{ERROR}} to either {{INFO}} or {{{}DEBUG{}}}. We may also 
> consider suggesting to users that they disable topic ACL syncing if their 
> Kafka cluster doesn't have authorization set up, but this should probably 
> only be emitted once over the lifetime of the connector in order to avoid 
> generating log spam.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rajinisivaram opened a new pull request, #13349: KAFKA-14452: Refactor AbstractStickyAssignor to prepare for rack-aware assignment

2023-03-06 Thread via GitHub


rajinisivaram opened a new pull request, #13349:
URL: https://github.com/apache/kafka/pull/13349

   This PR refactors AbstractStickyAssignor without changing any logic to make 
it easier to add rack-awareness. The class currently consists of a lot of 
collections that are passed around various methods, with some methods updating 
some collections. Addition of rack-awareness made this class with very large 
methods even more complex and harder to read. The new code moves the two 
assignment methods into their own classes so that the state can be maintained 
as instance fields rather than local variables.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >