[GitHub] [kafka] junrao commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-03-31 Thread via GitHub


junrao commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1154881426


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   Does this need to be volatile?



##
core/src/main/scala/kafka/network/NetworkUtils.scala:
##
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+import kafka.server.KafkaConfig
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
NetworkClient}
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, 
Selectable, Selector}
+import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.utils.{LogContext, Time}
+
+import scala.jdk.CollectionConverters._
+
+object NetworkUtils {

Review Comment:
   Since we are gradually moving code from core to separate modules in java, 
could we move this to server-common and write it in java?



##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -59,8 +61,9 @@ object TransactionCoordinator {
   time, metrics)
 
 val logContext = new LogContext(s"[TransactionCoordinator 
id=${config.brokerId}] ")
-val txnMarkerChannelManager = TransactionMarkerChannelManager(config, 
metrics, metadataCache, txnStateManager,
-  time, logContext)
+val txnMarkerChannelManager = new TransactionMarkerChannelManager(config, 
metadataCache, txnStateManager,
+  time)
+interBrokerSender.addRequestManager(txnMarkerChannelManager)

Review Comment:
   It's a bit inconvenient for very instance of InterBrokerRequestManager to do 
this. Would it be better to pass interBrokerSender to InterBrokerRequestManager 
and in the constructor do `interBrokerSender.addRequestManager(this)`?



##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   Do we need any synchronization for concurrency and visibility across threads?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13345: KAFKA-13020; Implement reading Snapshot log append timestamp

2023-03-31 Thread via GitHub


cmccabe commented on code in PR #13345:
URL: https://github.com/apache/kafka/pull/13345#discussion_r1154889074


##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java:
##
@@ -121,9 +122,22 @@ private Optional> nextBatch() {
 Batch batch = iterator.next();
 
 if (!lastContainedLogTimestamp.isPresent()) {
-// The Batch type doesn't support returning control batches. 
For now lets just use
-// the append time of the first batch
-lastContainedLogTimestamp = 
OptionalLong.of(batch.appendTimestamp());
+// This must be the first batch which is expected to be a 
control batch with one record for
+// the snapshot header.
+if (batch.controlRecords().isEmpty()) {
+throw new IllegalStateException("First batch is not a 
control batch with at least one record");

Review Comment:
   Are you thinking of the 2.8 -> 3.0 compatibility break? I'm not aware of any 
such break between 3.2 and 3.3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-03-31 Thread via GitHub


philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1154887444


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -546,6 +543,26 @@ public ConsumerRecords poll(long timeout) {
 throw new KafkaException("method not implemented");
 }
 
+ T tryGetFutureResult(
+final Time time,
+final WakeupableFuture future,
+final Duration timeout) throws ExecutionException, 
InterruptedException {
+Timer timer = time.timer(timeout.toMillis());
+do {
+if (future.isDone()) {
+return future.get();
+}
+
+if (this.shouldWakeup.get()) {
+this.shouldWakeup.set(false);
+future.cancel(true);
+throw new WakeupException();
+}
+// Maybe Thread.sleep?

Review Comment:
   There must be a better way to do this than... sleep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13490: KAFKA-14875: Implement wakeup

2023-03-31 Thread via GitHub


philipnee opened a new pull request, #13490:
URL: https://github.com/apache/kafka/pull/13490

   Interrupt the blocking methods and throw WakeupException


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14875) Implement Wakeup()

2023-03-31 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14875:
--

 Summary: Implement Wakeup()
 Key: KAFKA-14875
 URL: https://issues.apache.org/jira/browse/KAFKA-14875
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Implement wakeup() and WakeupException.  This would be different to the current 
implementation because I think we just need to interrupt the blocking futures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13345: KAFKA-13020; Implement reading Snapshot log append timestamp

2023-03-31 Thread via GitHub


cmccabe commented on code in PR #13345:
URL: https://github.com/apache/kafka/pull/13345#discussion_r1154884580


##
raft/src/main/java/org/apache/kafka/raft/ControlRecord.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.util.Objects;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.record.ControlRecordType;
+
+public final class ControlRecord {

Review Comment:
   sure, a follow-up is fine



##
raft/src/main/java/org/apache/kafka/raft/internals/ByteBufferSerde.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+
+public final class ByteBufferSerde implements RecordSerde {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KAFKA-14376-KIP887: Add ConfigProvider to make use of environment variables

2023-03-31 Thread via GitHub


OneCricketeer commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1154834846


##
clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.HashMap;
+
+import static 
org.apache.kafka.common.config.provider.EnvVarConfigProvider.ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class EnvVarConfigProviderTest {
+
+private EnvVarConfigProvider envVarConfigProvider = null;
+@BeforeEach
+public void setup() {
+Map testEnvVars = new HashMap() {
+{
+put("test_var1", "value1");
+put("secret_var2", "value2");
+put("new_var3", "value3");
+}
+};
+envVarConfigProvider = new EnvVarConfigProvider(testEnvVars);
+envVarConfigProvider.configure(Collections.singletonMap("", ""));
+}
+
+@Test
+void testGetAllEnvVarsNotEmpty() {
+ConfigData properties = envVarConfigProvider.get("");
+assertNotEquals(0, properties.data().size());
+}
+
+@Test
+void testGetMultipleKeysAndCompare() {
+ConfigData properties = envVarConfigProvider.get("");
+assertNotEquals(0, properties.data().size());
+assertEquals("value1", properties.data().get("test_var1"));
+assertEquals("value2", properties.data().get("secret_var2"));
+assertEquals("value3", properties.data().get("new_var3"));
+}
+
+@Test
+public void testGetOneKeyWithNullPath() {
+ConfigData config = envVarConfigProvider.get(null, 
Collections.singleton("secret_var2"));
+Map data = config.data();
+
+assertEquals(1, data.size());
+assertEquals("value2", data.get("secret_var2"));
+}
+
+@Test
+public void testGetOneKeyWithEmptyPath() {
+ConfigData config = envVarConfigProvider.get("", 
Collections.singleton("test_var1"));
+Map data = config.data();
+
+assertEquals(1, data.size());
+assertEquals("value1", data.get("test_var1"));
+}
+
+@Test
+void testGetWhitelistedEnvVars() {
+Set whiteList = new HashSet<>(Arrays.asList("test_var1", 
"secret_var2"));
+Set keys = envVarConfigProvider.get(null, 
whiteList).data().keySet();
+assertEquals(whiteList, keys);
+}
+@Test
+void testNotNullPathNonEmptyThrowsException() {
+assertThrows(ConfigException.class, () -> 
envVarConfigProvider.get("test-path", Collections.singleton("test_var1")));
+}
+
+@Test void testRegExpEnvVars() {
+Map testConfigMap = 
Collections.singletonMap(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG, "secret_.*");
+envVarConfigProvider.configure(testConfigMap);
+
+assertEquals(1, envVarConfigProvider.get(null, 
Collections.singleton("secret_var2")).data().size());

Review Comment:
   Does this also pass for `custom_secret_var2`?
   
   In other words, I believe this is testing `find(pattern)`, not strict 
`matches`, which has implicit `^ $` anchors. 
   
   There seems to be a difference between `asPredicate` and `asMatchPredicate`. 
   
   
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/regex/Pattern.html#asPredicate()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent opened a new pull request, #13489: KAFKA-14617: Fill broker epochs to the AlterPartitionRequest

2023-03-31 Thread via GitHub


CalvinConfluent opened a new pull request, #13489:
URL: https://github.com/apache/kafka/pull/13489

   As the third part of the 
[KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR),
 it fills the broker epochs from the Fetch request into the 
AlterPartitionRequest. Also, before generating the alterPartitionRequest, the 
partition will check whether the broker epoch from the FetchRequest matches 
with the broker epoch recorded in the metadata cache. If not, the ISR change 
will be delayed. 
   
   https://issues.apache.org/jira/browse/KAFKA-14617
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores

2023-03-31 Thread via GitHub


vcrfxia commented on code in PR #13444:
URL: https://github.com/apache/kafka/pull/13444#discussion_r1154765044


##
docs/streams/developer-guide/processor-api.html:
##
@@ -396,12 +400,68 @@ 
 
 
 
+
+Versioned Key-Value State Stores
+Versioned key-value state stores are available since Kafka 
Streams 3.5.
+Rather than storing a single record version (value and 
timestamp) per key,
+versioned state stores may store multiple record versions 
per key. This
+allows versioned state stores to support timestamped 
retrieval operations
+to return the latest record (per key) as of a specified 
timestamp, which
+enables use cases such as temporal stream-table joins.
+You can create a persistent, versioned state store by 
passing a
+VersionedBytesStoreSupplier
+to the
+versionedKeyValueStoreBuilder,
+or by implementing your own
+VersionedKeyValueStore.
+Each versioned store has an associated, fixed-duration 
history retention
+parameter which specifies long old record versions should 
be kept for.
+In particular, a versioned store guarantees to return 
accurate results for
+timestamped retrieval operations where the timestamp being 
queried is within
+history retention of the current observed stream time.
+History retention also doubles as its grace 
period, which determines
+how far back in time out-of-order writes to the store will 
be accepted. A
+versioned store will not accept writes (inserts, updates, 
or deletions) if
+the timestamp associated with the write is older than the 
current observed
+stream time by more than the grace period. Stream time in 
this context is
+tracked per-partition, rather than per-key, which means 
it's important
+that grace period (i.e., history retention) be set high 
enough to
+accommodate a record with one key arriving out-of-order 
relative to a
+record for another key.
+Because the memory footprint of versioned key-value stores 
is higher than
+that of non-versioned key-value stores, you may want to 
adjust your
+RocksDB memory settings
+accordingly. Benchmarking your application with versioned 
stores is also
+advised as performance is expected to be worse than when 
using non-versioned
+stores.
+Versioned stores do not support caching or interactive 
queries at this time.
+Also, window stores may not be versioned.
+Upgrade note: Versioned state stores are opt-in only; 
no automatic

Review Comment:
   I've added a section into the upgrade guide which links to the developer 
guide section. I think this is a good compromise in order to keep the upgrade 
guide smaller, and to be consistent with the fact that the detailed upgrade 
note for timestamped key-value stores is also in the developer guide (rather 
than the upgrade guide).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores

2023-03-31 Thread via GitHub


vcrfxia commented on code in PR #13444:
URL: https://github.com/apache/kafka/pull/13444#discussion_r1154757216


##
docs/streams/developer-guide/processor-api.html:
##
@@ -396,12 +400,68 @@ 
 
 
 
+
+Versioned Key-Value State Stores
+Versioned key-value state stores are available since Kafka 
Streams 3.5.
+Rather than storing a single record version (value and 
timestamp) per key,
+versioned state stores may store multiple record versions 
per key. This
+allows versioned state stores to support timestamped 
retrieval operations
+to return the latest record (per key) as of a specified 
timestamp, which
+enables use cases such as temporal stream-table joins.
+You can create a persistent, versioned state store by 
passing a
+VersionedBytesStoreSupplier
+to the
+versionedKeyValueStoreBuilder,
+or by implementing your own
+VersionedKeyValueStore.
+Each versioned store has an associated, fixed-duration 
history retention
+parameter which specifies long old record versions should 
be kept for.
+In particular, a versioned store guarantees to return 
accurate results for
+timestamped retrieval operations where the timestamp being 
queried is within
+history retention of the current observed stream time.
+History retention also doubles as its grace 
period, which determines
+how far back in time out-of-order writes to the store will 
be accepted. A
+versioned store will not accept writes (inserts, updates, 
or deletions) if
+the timestamp associated with the write is older than the 
current observed
+stream time by more than the grace period. Stream time in 
this context is

Review Comment:
   Hm, this looks to be inconsistent throughout the existing codebase since 
searching for both "stream time" and "stream-time" turn up a bunch of results 
across docs and also javadocs/code comments. Do we have an agreed upon standard 
for the repo? Personally I prefer "stream time" but unification is probably 
more important :) If there's already consensus I can make the updates in this 
PR. Otherwise, might be best left for a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores

2023-03-31 Thread via GitHub


vcrfxia commented on code in PR #13444:
URL: https://github.com/apache/kafka/pull/13444#discussion_r1154755917


##
docs/streams/developer-guide/processor-api.html:
##
@@ -261,11 +262,13 @@ 
 space.
 RocksDB settings can be fine-tuned, see
 RocksDB configuration.
-Available store
 variants:
-time window key-value store, session window 
key-value store.
-Use persistentTimestampedKeyValueStore
-when you need a  persistent 
key-(value/timestamp) store that supports put/get/delete and range queries.
-Use persistentTimestampedWindowStore
+Available store
 variants:
+versioned key-value store, time window 
key-value store, session window key-value store.

Review Comment:
   Added timestamped KV, regular window stores (previously only timestamped 
window stores were represented), and session stores into this section, in order 
to be comprehensive. LMK if it's too much now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-03-31 Thread via GitHub


guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1154750701


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Also cc @lucasbru @mjsax @lihaosky to bring this change to your attention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-03-31 Thread via GitHub


guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1154750701


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Also cc @lucasbru @mjsax @lihaosky  to bring to your attention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-03-31 Thread via GitHub


guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1154747980


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {
+log.info("Couldn't commit any tasks since a rebalance is in 
progress");
+} else {
+log.info("Committed {} transactions", numCommitted);

Review Comment:
   nit: `Committed the ongoing V2 transaction at the assignment due to newly 
created active tasks`.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   One caveat for EOS-v2 is that, when we commit, we'd have to make sure we are 
committing all tasks that have processed any data, but not just the active 
tasks --- sorry for not making that clear before, since it also bothers me some 
time to make it straight, and as a result I filed 
https://issues.apache.org/jira/browse/KAFKA-14847, please feel free to read it 
in more details. --- in a word, when we are in EOS-v2, each commit has to 
include everyone even if we only want to commit for a part of that, so we'd 
better give all the tasks in the `commitTasksAndMaybeUpdateCommittableOffsets` 
func below.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   This reminds me one thing: we call `onAssignment` first, and then 
`onPartitionsAssigned` later, and we only set `rebalanceInProgress` to false in 
the latter func, which means that during `onAssignment` we would always see 
`rebalanceInProgress == true` which would not allow a commit logically.. 
   
   I gave some thought about it, and currently the quick (and somewhat dirty..) 
fix would be to move the `rebalanceInProgress == true` line right before the 
`createNewTasks` inside the `handleAssignment` here. But we should leave a TODO 
such that moving one we would only rely on `onAssignment` as the rebalance 
complete barrier and move others from `onPartitionsAssigned` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-31 Thread via GitHub


artemlivshits commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1154744670


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+nodesToTransactions.synchronized {
+  // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+new TransactionDataAndCallbacks(
+  new AddPartitionsToTxnTransactionCollection(1),
+  mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+  // Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+  if (currentTransactionData != null) {
+if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+  val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+  currentTransactionData.topics().forEach { topic =>
+topic.partitions().forEach { partition =>
+  topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+}
+  }
+  val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+  currentNodeAndTransactionData.transactionData.remove(transactionData)
+  oldCallback(topicPartitionsToError.toMap)
+} else {
+  // We should never see a request on the same epoch since we haven't 
finished handling the one in queue
+  throw new InvalidRecordException("Received a second request from the 
same connection without finishing the first.")
+}
+  }
+  currentNodeAndTransactionData.transactionData.add(transactionData)
+  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+  wakeup()
+}
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  inflightNodes.remove(node)

Review Comment:
   Currently we only have one thread and it might be the case forever, but from 
this code the threading model is not obvious and it would be useful to have a 
comment that we don't need synchronization for inflightNodes because 
inflightNodes is only accessed from methods that are called on the sender's 
thread.



-- 

[GitHub] [kafka] mumrah closed pull request #13488: MINOR: Fix mock in BrokerMetadataPublisherTest

2023-03-31 Thread via GitHub


mumrah closed pull request #13488: MINOR: Fix mock in 
BrokerMetadataPublisherTest
URL: https://github.com/apache/kafka/pull/13488


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test

2023-03-31 Thread via GitHub


cmccabe commented on PR #13486:
URL: https://github.com/apache/kafka/pull/13486#issuecomment-1492303082

   Thanks for the fix, @showuon . I fixed this in a bit of a different way in 
https://github.com/apache/kafka/pull/13481 , by avoiding the exception rather 
than suppressing it. I think that's better since we want to know about these 
failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13481: MINOR: fix BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator

2023-03-31 Thread via GitHub


cmccabe commented on PR #13481:
URL: https://github.com/apache/kafka/pull/13481#issuecomment-1492298328

   committed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13488: MINOR: Fix mock in BrokerMetadataPublisherTest

2023-03-31 Thread via GitHub


cmccabe commented on PR #13488:
URL: https://github.com/apache/kafka/pull/13488#issuecomment-1492299446

   Looks like a duplicate of https://github.com/apache/kafka/pull/13481 . Sorry 
about the broken stuff.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #13481: MINOR: fix BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator

2023-03-31 Thread via GitHub


cmccabe closed pull request #13481: MINOR: fix 
BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator
URL: https://github.com/apache/kafka/pull/13481


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-03-31 Thread via GitHub


artemlivshits commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1154150901


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -247,6 +251,11 @@ class BrokerServer(
   )
   alterPartitionManager.start()
 
+  val interBrokerSendLogContext = new LogContext(s"[InterBrokerSender 
broker=${config.brokerId}]")

Review Comment:
   Could probably name interBrokerSenderLogContext, so that "interBrokerSender" 
is a common string to find all things related to interBrokerSender.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -247,6 +251,11 @@ class BrokerServer(
   )
   alterPartitionManager.start()
 
+  val interBrokerSendLogContext = new LogContext(s"[InterBrokerSender 
broker=${config.brokerId}]")
+  val networkClient: NetworkClient = 
NetworkUtils.buildNetworkClient("InterBrokerSendClient", config, metrics, time, 
interBrokerSendLogContext)

Review Comment:
   I think now the networkClient got a purpose, so we could name it 
interBrokerNetworkClient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed

2023-03-31 Thread via GitHub


divijvaidya commented on PR #13471:
URL: https://github.com/apache/kafka/pull/13471#issuecomment-1492282361

   @hudeqi please feel free to cherry-pick this commit: 
https://github.com/divijvaidya/kafka/commit/e6db8eaaeffc3b7a87f34c0e72ff25395fda97a3
 into this PR.
   
   It introduces the unit test which fails before the `removeMetrics()` and 
passes after that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13488: Fix mock in BrokerMetadataPublisherTest

2023-03-31 Thread via GitHub


mumrah opened a new pull request, #13488:
URL: https://github.com/apache/kafka/pull/13488

   
https://github.com/apache/kafka/commit/09e59bc7761a6b9ec1437b3decdfcd7b5fff868e 
added a list of metadata publishers in BrokerServer.scala which broke the mock 
used in BrokerMetadataPublisherTest#testExceptionInUpdateCoordinator. @showuon 
suppressed the exception in #13486. This patch fixes the mock


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-03-31 Thread via GitHub


philipnee commented on PR #13455:
URL: https://github.com/apache/kafka/pull/13455#issuecomment-1492207782

   @kirktrue - do you want to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-03-31 Thread via GitHub


philipnee commented on PR #13455:
URL: https://github.com/apache/kafka/pull/13455#issuecomment-1492189505

   Hey I left some comments, but it looks good afterall. @showuon @guozhangwang 
- Would you guys be interested in reviewing this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-03-31 Thread via GitHub


philipnee commented on code in PR #13455:
URL: https://github.com/apache/kafka/pull/13455#discussion_r1154648065


##
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##
@@ -92,11 +92,37 @@ public synchronized Set assignment() {
 return this.subscriptions.assignedPartitions();
 }
 
-/** Simulate a rebalance event. */
+/**
+ * Simulate a rebalance event.
+ */
 public synchronized void rebalance(Collection 
newAssignment) {
-// TODO: Rebalance callbacks
+// prepare for rebalance callback

Review Comment:
   This is more like, computing the partitions to be added and removed, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-03-31 Thread via GitHub


philipnee commented on code in PR #13455:
URL: https://github.com/apache/kafka/pull/13455#discussion_r1154647021


##
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##
@@ -92,11 +92,37 @@ public synchronized Set assignment() {
 return this.subscriptions.assignedPartitions();
 }
 
-/** Simulate a rebalance event. */
+/**
+ * Simulate a rebalance event.
+ */
 public synchronized void rebalance(Collection 
newAssignment) {
-// TODO: Rebalance callbacks
+// prepare for rebalance callback
+Set oldAssignmentSet = 
this.subscriptions.assignedPartitions();
+Set newAssignmentSet = new HashSet<>(newAssignment);
+List added = new ArrayList<>(newAssignment.size());

Review Comment:
   I'm pretty sure you can do set.removeAll(stuff) - saves some code there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on pull request #13364: KAFKA-14491: [16/N] Add recovery logic for store inconsistency due to failed write

2023-03-31 Thread via GitHub


vcrfxia commented on PR #13364:
URL: https://github.com/apache/kafka/pull/13364#issuecomment-1492185480

   The build failed with an error that looks unrelated to this PR:
   ```
   [2023-03-31T00:06:22.083Z] FAILURE: Build failed with an exception.
   [2023-03-31T00:06:22.083Z] 
   [2023-03-31T00:06:22.083Z] * What went wrong:
   [2023-03-31T00:06:22.083Z] A problem was found with the configuration of 
task ':rat' (type 'RatTask').
   [2023-03-31T00:06:22.083Z]   - Gradle detected a problem with the following 
location: '/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13364'.
   [2023-03-31T00:06:22.083Z] 
   [2023-03-31T00:06:22.083Z] Reason: Task ':rat' uses this output of task 
':clients:processTestMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed.
   ```
   
   @mjsax would you mind triggering a new build in case it's a flake? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #13458: KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka cl…

2023-03-31 Thread via GitHub


urbandan commented on PR #13458:
URL: https://github.com/apache/kafka/pull/13458#issuecomment-1492165169

   @C0urante Thank you for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test

2023-03-31 Thread via GitHub


mumrah merged PR #13486:
URL: https://github.com/apache/kafka/pull/13486


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test

2023-03-31 Thread via GitHub


mumrah commented on PR #13486:
URL: https://github.com/apache/kafka/pull/13486#issuecomment-1492109609

   Thanks @showuon, this LGTM to unblock trunk. This was introduced in 
09e59bc7761a6b9, I'm going to dig into it today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi closed pull request #13485: MINOR:Remove unused metric variable in ReplicaManager

2023-03-31 Thread via GitHub


hudeqi closed pull request #13485: MINOR:Remove unused metric variable in 
ReplicaManager
URL: https://github.com/apache/kafka/pull/13485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13485: MINOR:Remove unused metric variable in ReplicaManager

2023-03-31 Thread via GitHub


hudeqi commented on PR #13485:
URL: https://github.com/apache/kafka/pull/13485#issuecomment-1492080944

   It looks like it's used in ControllerIntegrationTest, and I don't know why 
it can't see the call in IDEA  :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14874) Unable to create > 5000 topics for once when using Kraft

2023-03-31 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-14874:
--

 Summary: Unable to create > 5000 topics for once when using Kraft
 Key: KAFKA-14874
 URL: https://issues.apache.org/jira/browse/KAFKA-14874
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


the error happens due to 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L779]

I encountered this error when creating >5000 topics for mirroring the cluster 
from zk to Kraft. The operation of creating a bunch of topics is allowed by 
zk-based kafka.

It seems to me there are two improvements for this issue.

1) add more precise error message for such case.

2) make `maxRecordsPerBatch` configurable (there is already a setter 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L272])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role

2023-03-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14838.
---
Resolution: Done

> MM2 Worker/Connector/Task clients should specify client ID based on flow and 
> role
> -
>
> Key: KAFKA-14838
> URL: https://issues.apache.org/jira/browse/KAFKA-14838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
> Fix For: 3.5.0
>
>
> MM2 code creates a lot of Kafka clients internally. These clients generate a 
> lot of logs, but since the client.id is not properly specified, connecting 
> the dots between a specific Connector/Task and its internal client is close 
> to impossible. This is even more complex when MM2 is running in distributed 
> mode, in which multiple Connect workers are running inside the same process.
> For Connector/Task created clients, the client.id  clients should specify the 
> flow, the Connector name/Task ID and the role of the client. E.g. 
> MirrorSourceConnector uses multiple admin clients, and their client.id should 
> reflect the difference between them.
> For Worker created clients, the client.id should refer to the flow.
> This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role

2023-03-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-14838:
---

> MM2 Worker/Connector/Task clients should specify client ID based on flow and 
> role
> -
>
> Key: KAFKA-14838
> URL: https://issues.apache.org/jira/browse/KAFKA-14838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
> Fix For: 3.5.0
>
>
> MM2 code creates a lot of Kafka clients internally. These clients generate a 
> lot of logs, but since the client.id is not properly specified, connecting 
> the dots between a specific Connector/Task and its internal client is close 
> to impossible. This is even more complex when MM2 is running in distributed 
> mode, in which multiple Connect workers are running inside the same process.
> For Connector/Task created clients, the client.id  clients should specify the 
> flow, the Connector name/Task ID and the role of the client. E.g. 
> MirrorSourceConnector uses multiple admin clients, and their client.id should 
> reflect the difference between them.
> For Worker created clients, the client.id should refer to the flow.
> This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13458: KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka cl…

2023-03-31 Thread via GitHub


C0urante merged PR #13458:
URL: https://github.com/apache/kafka/pull/13458


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram commented on PR #13350:
URL: https://github.com/apache/kafka/pull/13350#issuecomment-1491991489

   @dajac Thanks for the review, I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram commented on code in PR #13350:
URL: https://github.com/apache/kafka/pull/13350#discussion_r1154530618


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -574,6 +697,44 @@ private void assignOwnedPartitions() {
 }
 }
 
+// Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+// otherwise, to minQuota
+private void assignRackAwareRoundRobin(List 
unassignedPartitions) {
+if (rackInfo.consumerRacks.isEmpty())
+return;
+int nextUnfilledConsumerIndex = 0;
+Iterator unassignedIter = 
unassignedPartitions.iterator();
+while (unassignedIter.hasNext()) {
+TopicPartition unassignedPartition = unassignedIter.next();
+String consumer = null;
+int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+if (nextIndex >= 0) {
+consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+int assignmentCount = assignment.get(consumer).size() + 1;
+if (assignmentCount >= minQuota) {
+
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+if (assignmentCount < maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+} else {
+nextIndex++;
+}
+nextUnfilledConsumerIndex = 
unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % 
unfilledMembersWithUnderMinQuotaPartitions.size();
+} else if 
(!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+int firstIndex = 
rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithExactlyMinQuotaPartitions, 0);
+if (firstIndex >= 0) {
+consumer = 
unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex);
+if (assignment.get(consumer).size() + 1 == maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex);
+}
+}
+if (consumer == null)
+continue;

Review Comment:
Updated.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -835,7 +1004,7 @@ private List assignOwnedPartitions() {
 // if this topic partition of this consumer no 
longer exists, remove it from currentAssignment of the consumer
 partitionIter.remove();
 currentPartitionConsumer.remove(partition);
-} else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+} else if 
(!consumerSubscription.topics().contains(partition.topic()) || 
rackInfo.racksMismatch(consumer, partition)) {

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14867) Trigger rebalance when replica racks change if client.rack is configured

2023-03-31 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14867.

  Reviewer: David Jacot
Resolution: Fixed

> Trigger rebalance when replica racks change if client.rack is configured
> 
>
> Key: KAFKA-14867
> URL: https://issues.apache.org/jira/browse/KAFKA-14867
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.5.0
>
>
> To improve locality after reassignments, trigger rebalance from leader if set 
> of racks of partition replicas change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rajinisivaram merged pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram merged PR #13474:
URL: https://github.com/apache/kafka/pull/13474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13434: KAFKA-14785: Connect offset read REST API

2023-03-31 Thread via GitHub


C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1154513992


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##
@@ -1040,6 +1048,40 @@ public void testGetConnectorTypeWithEmptyConfig() {
 assertEquals(ConnectorType.UNKNOWN, 
herder.connectorType(Collections.emptyMap()));
 }
 
+@Test
+public void testGetConnectorOffsetsConnectorNotFound() {

Review Comment:
   This test is failing now (probably because of the shift to use a snapshot in 
`AbstractHerder::connectorOffsets`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram commented on PR #13474:
URL: https://github.com/apache/kafka/pull/13474#issuecomment-1491969056

   @dajac Thanks for the review, test failures not related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13434: KAFKA-14785: Connect offset read REST API

2023-03-31 Thread via GitHub


C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1154508578


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -866,4 +867,18 @@ public List connectorPluginConfig(String 
pluginName) {
 }
 }
 
+@Override
+public void connectorOffsets(String connName, Callback 
cb) {

Review Comment:
   It's just a preventative measure to keep the threading model simple. Right 
now (with the exception of parallel start/stop of connectors/tasks) we don't 
make any concurrent calls to the `Worker` class. It may not be a problem for 
this specific case but it makes things easier to reason about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open

2023-03-31 Thread Sudesh Wasnik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sudesh Wasnik reassigned KAFKA-14091:
-

Assignee: Sagar Rao  (was: Sudesh Wasnik)

> Suddenly-killed tasks can leave hanging transactions open
> -
>
> Key: KAFKA-14091
> URL: https://issues.apache.org/jira/browse/KAFKA-14091
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> Right now, if a task running with exactly-once support is killed 
> ungracefully, it may leave a hanging transaction open. If the transaction 
> included writes to the global offsets topic, then startup for future workers 
> becomes blocked on that transaction expiring.
> Ideally, we could identify these kinds of hanging transactions and 
> proactively abort them.
> Unfortunately, there are a few facts that make this fairly complicated:
>  # Workers read to the end of the offsets topic during startup, before 
> joining the cluster
>  # Workers do not know which tasks they are assigned until they join the 
> cluster
> The result of these facts is that we cannot trust workers that are restarted 
> shortly after being ungracefully shut down to fence out their own hanging 
> transactions, since any hanging transactions would prevent them from being 
> able to join the group and receive their task assignment in the first place.
> We could possibly accomplish this by having the leader proactively abort any 
> open transactions for tasks on workers that appear to have left the cluster 
> during a rebalance. This would not require us to wait for the scheduled 
> rebalance delay to elapse, since the intent of the delay is to provide a 
> buffer between when workers leave and when their connectors/tasks are 
> reallocated across the cluster (and, if the worker is able to rejoin before 
> that buffer is consumed, then give it back the same connectors/tasks it was 
> running previously); aborting transactions for tasks on these workers would 
> not interfere with that goal.
>  
> It's also possible that we may have to handle the case where a 
> [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287]
>  task leaves a transaction open; I have yet to confirm whether this is 
> possible, though.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open

2023-03-31 Thread Sudesh Wasnik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sudesh Wasnik reassigned KAFKA-14091:
-

Assignee: Sudesh Wasnik  (was: Sagar Rao)

> Suddenly-killed tasks can leave hanging transactions open
> -
>
> Key: KAFKA-14091
> URL: https://issues.apache.org/jira/browse/KAFKA-14091
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sudesh Wasnik
>Priority: Major
>
> Right now, if a task running with exactly-once support is killed 
> ungracefully, it may leave a hanging transaction open. If the transaction 
> included writes to the global offsets topic, then startup for future workers 
> becomes blocked on that transaction expiring.
> Ideally, we could identify these kinds of hanging transactions and 
> proactively abort them.
> Unfortunately, there are a few facts that make this fairly complicated:
>  # Workers read to the end of the offsets topic during startup, before 
> joining the cluster
>  # Workers do not know which tasks they are assigned until they join the 
> cluster
> The result of these facts is that we cannot trust workers that are restarted 
> shortly after being ungracefully shut down to fence out their own hanging 
> transactions, since any hanging transactions would prevent them from being 
> able to join the group and receive their task assignment in the first place.
> We could possibly accomplish this by having the leader proactively abort any 
> open transactions for tasks on workers that appear to have left the cluster 
> during a rebalance. This would not require us to wait for the scheduled 
> rebalance delay to elapse, since the intent of the delay is to provide a 
> buffer between when workers leave and when their connectors/tasks are 
> reallocated across the cluster (and, if the worker is able to rejoin before 
> that buffer is consumed, then give it back the same connectors/tasks it was 
> running previously); aborting transactions for tasks on these workers would 
> not interfere with that goal.
>  
> It's also possible that we may have to handle the case where a 
> [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287]
>  task leaves a transaction open; I have yet to confirm whether this is 
> possible, though.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-03-31 Thread via GitHub


showuon commented on PR #13487:
URL: https://github.com/apache/kafka/pull/13487#issuecomment-1491884963

   Thanks for the PR. I'll have an early review this weekend. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #13483: MINOR: use readlock for read epochs

2023-03-31 Thread via GitHub


satishd merged PR #13483:
URL: https://github.com/apache/kafka/pull/13483


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13483: MINOR: use readlock for read epochs

2023-03-31 Thread via GitHub


satishd commented on PR #13483:
URL: https://github.com/apache/kafka/pull/13483#issuecomment-1491858350

   Failed tests are not related to this change, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13483: MINOR: use readlock for read epochs

2023-03-31 Thread via GitHub


satishd commented on code in PR #13483:
URL: https://github.com/apache/kafka/pull/13483#discussion_r1154414786


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,11 +382,11 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();

Review Comment:
   Nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd opened a new pull request, #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-03-31 Thread via GitHub


satishd opened a new pull request, #13487:
URL: https://github.com/apache/kafka/pull/13487

   **_This PR is NOT YET READY for review. It needs some cleaning of the code 
and a few more changes._**  
   
   It is raised to run jenkins jobs with different environments. I will update 
the description once it is ready for review 
   
   Added functionality to copy log segments, indexes to the target remote 
storage for each topic partition enabled with tiered storage. This involves 
creating scheduled tasks for all leader partition replicas to copy their log 
segments in sequence to tiered storage.t push -f origin
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)

2023-03-31 Thread via GitHub


dajac commented on code in PR #13474:
URL: https://github.com/apache/kafka/pull/13474#discussion_r1154396016


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1635,6 +1641,38 @@ public String toString() {
 return "(version" + version + ": " + partitionsPerTopic + ")";
 }
 }
+private static class RackInfo {
+private final Set racks;
+RackInfo(Optional clientRack, PartitionInfo partition) {
+if (clientRack.isPresent() && partition.replicas() != null) {
+racks = 
Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet());
+} else {
+racks = Collections.emptySet();
+}
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (!(o instanceof RackInfo)) {
+return false;
+}
+RackInfo rackInfo = (RackInfo) o;
+return Objects.equals(racks, rackInfo.racks);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(racks);
+}
+
+@Override
+public String toString() {
+return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks;

Review Comment:
   That makes sense. I did not realize it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-03-31 Thread via GitHub


dajac commented on code in PR #13350:
URL: https://github.com/apache/kafka/pull/13350#discussion_r1154372329


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -61,35 +72,51 @@ static final class ConsumerGenerationPair {
 public static final class MemberData {
 public final List partitions;
 public final Optional generation;
-public MemberData(List partitions, Optional 
generation) {
+public final Optional rackId;
+public MemberData(List partitions, Optional 
generation, Optional rackId) {

Review Comment:
   small nit: Could we add an empty line before this one?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -835,7 +1004,7 @@ private List assignOwnedPartitions() {
 // if this topic partition of this consumer no 
longer exists, remove it from currentAssignment of the consumer
 partitionIter.remove();
 currentPartitionConsumer.remove(partition);
-} else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+} else if 
(!consumerSubscription.topics().contains(partition.topic()) || 
rackInfo.racksMismatch(consumer, partition)) {

Review Comment:
   nit: Should we update the comment below this line as well?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -574,6 +697,44 @@ private void assignOwnedPartitions() {
 }
 }
 
+// Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+// otherwise, to minQuota
+private void assignRackAwareRoundRobin(List 
unassignedPartitions) {
+if (rackInfo.consumerRacks.isEmpty())
+return;
+int nextUnfilledConsumerIndex = 0;
+Iterator unassignedIter = 
unassignedPartitions.iterator();
+while (unassignedIter.hasNext()) {
+TopicPartition unassignedPartition = unassignedIter.next();
+String consumer = null;
+int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+if (nextIndex >= 0) {
+consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+int assignmentCount = assignment.get(consumer).size() + 1;
+if (assignmentCount >= minQuota) {
+
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+if (assignmentCount < maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+} else {
+nextIndex++;
+}
+nextUnfilledConsumerIndex = 
unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % 
unfilledMembersWithUnderMinQuotaPartitions.size();
+} else if 
(!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+int firstIndex = 
rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithExactlyMinQuotaPartitions, 0);
+if (firstIndex >= 0) {
+consumer = 
unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex);
+if (assignment.get(consumer).size() + 1 == maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex);
+}
+}
+if (consumer == null)
+continue;

Review Comment:
   small nit: It may be better to invert the condition and bring the above 
lines into the if branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13454: MINOR: move RecordReader from org.apache.kafka.tools to org.apache.co…

2023-03-31 Thread via GitHub


chia7712 commented on PR #13454:
URL: https://github.com/apache/kafka/pull/13454#issuecomment-1491754313

   @ijuma @mimaison Could you take a look? move `RecordReader` to the same 
package as `MessageFormatter`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed

2023-03-31 Thread via GitHub


hudeqi commented on code in PR #13471:
URL: https://github.com/apache/kafka/pull/13471#discussion_r1154278469


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4485,6 +4485,25 @@ class ReplicaManagerTest {
   assertTrue(response.usableBytes >= 0)
 }
   }
+
+  @Test
+  def checkRemoveMetricsCountMatchRegisterCount(): Unit = {

Review Comment:
   > I have an idea with which we can avoid changing the KafkaMetricsGroup. 
Could you please consider the following.
   > 
   > You can use mockito's `MockConstruction` to mock the KafkaMetricsGroup and 
count the number of invocations on that mock and later assert on number of 
invocations of add and remove. As an example, this test could be written as 
follows (rough example with Java code):
   > 
   > ```
   > var numAddCount = 0;
   > var numRemoveCount = 0;
   > try (MockedConstruction mockMetricsGroup = 
mockConstruction(KafkaMetricsGroup.class,
   > (mock, context) -> {
   >doAnswer(invocation -> {
   > numAddCount++;
   > }).when(mock).newGauge(anyString());
   > 
   > // similarly add mocks for newMeter etc.
   > 
   > doAnswer(invocation -> {
   > numRemoveCount++;
   > }).when(mock).removeMetric(anyString());
   > })) {
   >val rm = new ReplicaManager(
   >   metrics = metrics,
   >   config = config,
   >   time = time,
   >   scheduler = new MockScheduler(time),
   >   logManager = mockLogMgr,
   >   quotaManagers = quotaManager,
   >   metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
   >   logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
   >   alterPartitionManager = alterPartitionManager,
   >   threadNamePrefix = Option(this.getClass.getName))
   >   
   >rm.shutdown()
   >assertEquals(numAddCount, numRemoveCount)
   >  }
   > ```
   
   Sorry, I tried my best to write this unit test according to this method, but 
since newGauge has a return value, there is no way to use doAnswer. It seems 
that KafkaMetricsGroup must be changed? @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


divijvaidya commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154306477


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   Thanks for the explanation @showuon. It's clear now. 
   
   To rephrase what you mentioned (correct me if I misunderstood) we want to 
create an in-memory copy of the `LeaderEpochCheckpointCache` so that we can 
modify it without modifying the original checkpointcache & checkpoint. Is that 
right?
   
   At the end of the day, RLSM requires `Map segmentLeaderEpochs()` 
and the source is `LeaderEpochCheckpointCache`. I was wondering if it would be 
possible to copy the `LeaderEpochCheckpointCache` into an intermediate data 
structure (which doesn't have to be of type LeaderEpochCheckpointCache, hence 
decoupling it from the need to create a dummy `InMemoryLeaderEpochCheckpoint `) 
and then using that intermediate data structure to extract the required Map 
(after whatever manipulation we want to do with it).
   
   Similar logic could be employed for the requirement to provide `ByteBuffer 
leaderEpochIndex` to `LogSegmentData`.
   
   Thoughts?
   
   
   Having said that I don't want to block this PR on this refactoring 
discussion. It can be done separately after this is merged. In the scope of 
this PR, if you add a comment to InMemoryLeaderEpochCheckpoint which could 
explain future reader of the code about the purpose of this class (the way you 
described in the comment above), that would be enough to make me happy.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154310401


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,21 +398,25 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();
 try {
 return new ArrayList<>(epochs.values());
 } finally {
-lock.writeLock().unlock();
+lock.readLock().unlock();
 }
 }
 
-private void flush() {
+private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, 
Collection epochEntries) {
 lock.readLock().lock();
 try {
-checkpoint.write(epochs.values());
+leaderEpochCheckpoint.write(epochEntries);
 } finally {
 lock.readLock().unlock();
 }
 }
 
+private void flush() {
+flushTo(this.checkpoint, epochs.values());

Review Comment:
   No, the 2nd parameter is different. For `in memory` one, we need to "clone" 
the epochs values to avoid change the inner entries. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test

2023-03-31 Thread via GitHub


showuon commented on PR #13486:
URL: https://github.com/apache/kafka/pull/13486#issuecomment-1491700502

   @cmccabe @mumrah @dengziming , call for review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13486: MINOR: fix broker testExceptionInUpdateCoordinator test

2023-03-31 Thread via GitHub


showuon opened a new pull request, #13486:
URL: https://github.com/apache/kafka/pull/13486

   After this change: https://github.com/apache/kafka/pull/13462 , the 
`testExceptionInUpdateCoordinator` failed with 
   ```
   java.util.concurrent.ExecutionException: 
java.util.concurrent.ExecutionException: 
org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Error updating group coordinator with local changes in MetadataDelta up to 9: 
injected failure
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at 
kafka.testkit.KafkaClusterTestKit.waitForAllFutures(KafkaClusterTestKit.java:569)
at kafka.testkit.KafkaClusterTestKit.close(KafkaClusterTestKit.java:541)
at 
kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator(BrokerMetadataPublisherTest.scala:270)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Error updating group coordinator with local changes in MetadataDelta up to 9: 
injected failure
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at kafka.server.BrokerServer.$anonfun$shutdown$6(BrokerServer.scala:547)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at kafka.server.BrokerServer.shutdown(BrokerServer.scala:547)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: org.apache.kafka.server.fault.FaultHandlerException: 
nonFatalFaultHandler: Error updating group coordinator with local changes in 
MetadataDelta up to 9: injected failure
at 
app//kafka.server.metadata.BrokerMetadataPublisher.$anonfun$onMetadataUpdate$7(BrokerMetadataPublisher.scala:188)
at app//scala.Option.foreach(Option.scala:407)
at 
app//kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:174)
at 
app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:298)
at 
app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
... 1 more
   Caused by: java.lang.RuntimeException: injected failure
at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$onMetadataUpdate$7(BrokerMetadataPublisher.scala:200)
at scala.Option.foreach(Option.scala:407)
at 
kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:174)
at 
org.apache.kafka.image.loader.MetadataLoader.lambda$handleCommit$1(MetadataLoader.java:341)
... 4 more
   ```
   
   
   So, it failed when we tried to remove and close the 
`brokerMetadataPublisher`, because the `uninitializedPublishers` has no this 
publisher when we removing it (deleted 
[here](https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java#L291-L298)).
 We should, anyway, not failed when shutdown the broker/controller, so swallow 
the exceptions.  
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


divijvaidya commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154259339


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -25,6 +26,7 @@ import org.apache.kafka.common.record.{RecordBatch, 
RemoteLogInputStream}
 import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
 import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
 import org.apache.kafka.server.log.remote.storage._
+import 
org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint

Review Comment:
   Is this still required here?



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,21 +398,25 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();
 try {
 return new ArrayList<>(epochs.values());
 } finally {
-lock.writeLock().unlock();
+lock.readLock().unlock();
 }
 }
 
-private void flush() {
+private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, 
Collection epochEntries) {
 lock.readLock().lock();
 try {
-checkpoint.write(epochs.values());
+leaderEpochCheckpoint.write(epochEntries);
 } finally {
 lock.readLock().unlock();
 }
 }
 
+private void flush() {
+flushTo(this.checkpoint, epochs.values());

Review Comment:
   both invocations for `flushTo()` have the same second parameter.
   
   Could we instead read the entries inside the flushTo method itself? Note 
that we don't require to acquire locks in flushTo() or in 
cloneWithLeaderEpochCheckpoint(), since epochEntries() does it for us.
   
   ```
   private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
   leaderEpochCheckpoint.write(epochEntries());
   }
   
   private void flush() {
   flushTo(this.checkpoint);
   }
   
   public LeaderEpochFileCache 
cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
   flushTo(leaderEpochCheckpoint);
   // create a new cache backed by the provided leaderEpochCheckpoint
   return new LeaderEpochFileCache(this.topicPartition, 
leaderEpochCheckpoint);
   }
   ```
   



##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   Thanks for the explanation @showuon. It's clear now. 
   
   To rephrase what you mentioned (correct me if I misunderstood) we want to 
create an in-memory copy of the `LeaderEpochCheckpointCache` so that we can 
modify it without modifying the original checkpointcache & checkpoint. Is that 
right?
   
   At the end of the day, RLSM requires `Map segmentLeaderEpochs()` 
and the source is `LeaderEpochCheckpointCache`. I was wondering if it would be 
possible to copy the `LeaderEpochCheckpointCache` into an intermediate data 
structure (which doesn't have to be of type LeaderEpochCheckpointCache, hence 
decoupling it from the need to create a dummy `InMemoryLeaderEpochCheckpoint `) 
and then using that intermediate data structure to extract the required Map 
(after whatever manipulation we want to do with it).
   
   Similar logic could be employed for the requirement to provide `ByteBuffer 
leaderEpochIndex` to `LogSegmentData`.
   
   Thoughts?
   



-- 
This is an 

[GitHub] [kafka] hudeqi opened a new pull request, #13485: MINOR:Remove unused metric variable in ReplicaManager

2023-03-31 Thread via GitHub


hudeqi opened a new pull request, #13485:
URL: https://github.com/apache/kafka/pull/13485

   This metric variable has never been used and has not been used for testing. 
Should it be deleted?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram commented on PR #13474:
URL: https://github.com/apache/kafka/pull/13474#issuecomment-1491671237

   @dajac Thanks for the review. Addressed comments and left one question.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram commented on code in PR #13474:
URL: https://github.com/apache/kafka/pull/13474#discussion_r1154285495


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1635,6 +1641,38 @@ public String toString() {
 return "(version" + version + ": " + partitionsPerTopic + ")";
 }
 }
+private static class RackInfo {
+private final Set racks;
+RackInfo(Optional clientRack, PartitionInfo partition) {
+if (clientRack.isPresent() && partition.replicas() != null) {
+racks = 
Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet());
+} else {
+racks = Collections.emptySet();
+}
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (!(o instanceof RackInfo)) {
+return false;
+}
+RackInfo rackInfo = (RackInfo) o;
+return Objects.equals(racks, rackInfo.racks);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(racks);
+}
+
+@Override
+public String toString() {
+return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks;

Review Comment:
   This appears in MetadataSnapshot as an ordered list by partition, so that 
may be sufficient? To add partition index here, we would need to store 
partition index in RackInfo as well, which seemed too much just for toString. 
What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)

2023-03-31 Thread via GitHub


rajinisivaram commented on code in PR #13474:
URL: https://github.com/apache/kafka/pull/13474#discussion_r1154282863


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1613,14 +1615,18 @@ private ConsumerCoordinatorMetrics(Metrics metrics, 
String metricGrpPrefix) {
 
 private static class MetadataSnapshot {
 private final int version;
-private final Map partitionsPerTopic;
+private final Map> partitionsPerTopic;

Review Comment:
   Since we already have another PartitionInfo, I renamed this one 
`PartitionRackInfo`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed

2023-03-31 Thread via GitHub


hudeqi commented on code in PR #13471:
URL: https://github.com/apache/kafka/pull/13471#discussion_r1154278469


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4485,6 +4485,25 @@ class ReplicaManagerTest {
   assertTrue(response.usableBytes >= 0)
 }
   }
+
+  @Test
+  def checkRemoveMetricsCountMatchRegisterCount(): Unit = {

Review Comment:
   > I have an idea with which we can avoid changing the KafkaMetricsGroup. 
Could you please consider the following.
   > 
   > You can use mockito's `MockConstruction` to mock the KafkaMetricsGroup and 
count the number of invocations on that mock and later assert on number of 
invocations of add and remove. As an example, this test could be written as 
follows (rough example with Java code):
   > 
   > ```
   > var numAddCount = 0;
   > var numRemoveCount = 0;
   > try (MockedConstruction mockMetricsGroup = 
mockConstruction(KafkaMetricsGroup.class,
   > (mock, context) -> {
   >doAnswer(invocation -> {
   > numAddCount++;
   > }).when(mock).newGauge(anyString());
   > 
   > // similarly add mocks for newMeter etc.
   > 
   > doAnswer(invocation -> {
   > numRemoveCount++;
   > }).when(mock).removeMetric(anyString());
   > })) {
   >val rm = new ReplicaManager(
   >   metrics = metrics,
   >   config = config,
   >   time = time,
   >   scheduler = new MockScheduler(time),
   >   logManager = mockLogMgr,
   >   quotaManagers = quotaManager,
   >   metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
   >   logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
   >   alterPartitionManager = alterPartitionManager,
   >   threadNamePrefix = Option(this.getClass.getName))
   >   
   >rm.shutdown()
   >assertEquals(numAddCount, numRemoveCount)
   >  }
   > ```
   
   Sorry, I tried my best to write this unit test according to this method, but 
since newGauge has a return value, there is no way to use doAnswer. It seems 
that KafkaMetricsGroup must be changed? @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed

2023-03-31 Thread via GitHub


hudeqi commented on code in PR #13471:
URL: https://github.com/apache/kafka/pull/13471#discussion_r1154277604


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4485,6 +4485,25 @@ class ReplicaManagerTest {
   assertTrue(response.usableBytes >= 0)
 }
   }
+
+  @Test
+  def checkRemoveMetricsCountMatchRegisterCount(): Unit = {

Review Comment:
   > 
   
   
   
   > I have an idea with which we can avoid changing the KafkaMetricsGroup. 
Could you please consider the following.
   > 
   > You can use mockito's `MockConstruction` to mock the KafkaMetricsGroup and 
count the number of invocations on that mock and later assert on number of 
invocations of add and remove. As an example, this test could be written as 
follows (rough example with Java code):
   > 
   > ```
   > var numAddCount = 0;
   > var numRemoveCount = 0;
   > try (MockedConstruction mockMetricsGroup = 
mockConstruction(KafkaMetricsGroup.class,
   > (mock, context) -> {
   >doAnswer(invocation -> {
   > numAddCount++;
   > }).when(mock).newGauge(anyString());
   > 
   > // similarly add mocks for newMeter etc.
   > 
   > doAnswer(invocation -> {
   > numRemoveCount++;
   > }).when(mock).removeMetric(anyString());
   > })) {
   >val rm = new ReplicaManager(
   >   metrics = metrics,
   >   config = config,
   >   time = time,
   >   scheduler = new MockScheduler(time),
   >   logManager = mockLogMgr,
   >   quotaManagers = quotaManager,
   >   metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
   >   logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
   >   alterPartitionManager = alterPartitionManager,
   >   threadNamePrefix = Option(this.getClass.getName))
   >   
   >rm.shutdown()
   >assertEquals(numAddCount, numRemoveCount)
   >  }
   > ```
   
   Sorry, I tried my best to write a single test according to this method, but 
since newGauge has a return value, there is no way to use doAnswer. It seems 
that KafkaMetricsGroup must be changed? @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

2023-03-31 Thread via GitHub


dajac merged PR #13408:
URL: https://github.com/apache/kafka/pull/13408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

2023-03-31 Thread via GitHub


dajac commented on PR #13408:
URL: https://github.com/apache/kafka/pull/13408#issuecomment-1491621526

   Failed tests are not related. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2023-03-31 Thread Bert Roos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707213#comment-17707213
 ] 

Bert Roos commented on KAFKA-6007:
--

[~gharris1727] I created a reproduction, but in the end, it showed this issue 
is resolved. You find the reproduction here: 
https://github.com/Bert-R/kafka-6007-repro. Apparently, something else was 
preventing my SMT to load.
Based on that, I believe this issue can be set to Resolved.

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Major
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-03-31 Thread via GitHub


dajac commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1154222435


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##
@@ -260,12 +261,18 @@ public void onFailure(
 .filter(future.lookupKeys()::contains)
 .collect(Collectors.toSet());
 retryLookup(keysToUnmap);
+} else if (t instanceof UnsupportedVersionException) {
+Map unrecoverableFailures =
+handler.handleUnsupportedVersionException(
+(UnsupportedVersionException) t,
+spec.keys,
+spec.scope instanceof FulfillmentScope);

Review Comment:
   If the scope is NOT a FulfillmentScope, I think that we don't have to 
`completeExceptionally` but rather to `retryLookup` like we do in the previous 
branch. Am I wrong? If this is true, it may be better to completely separate 
the two cases. For instance, we could also add 
`handleUnsupportedVersionException` to the lookup strategy in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13458: KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka cl…

2023-03-31 Thread via GitHub


urbandan commented on code in PR #13458:
URL: https://github.com/apache/kafka/pull/13458#discussion_r1154222170


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##
@@ -200,6 +200,8 @@ public Map workerConfig(SourceAndTarget 
sourceAndTarget) {
 
 // fill in reasonable defaults
 props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
+String groupId = props.get(GROUP_ID_CONFIG);
+props.putIfAbsent(CommonClientConfigs.CLIENT_ID_CONFIG, groupId + "|" 
+ sourceAndTarget);

Review Comment:
   Thanks for the clarification - not going to add the group ID here, but I 
will still try to set it based on the flow.
   In MM2, the group ID only contains the source alias, with specific setups 
(e.g. star architecture) the group ID is not unique in a single process. 
DistributedHerder logs do contain the client ID, and I found it useful in many 
situations (e.g. tracking rebalances/assignments of a specific flow).
   Thanks for the hint about the shared admin, will fix that, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154193013


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,11 +396,11 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();

Review Comment:
   Opened another PR https://github.com/apache/kafka/pull/13483 to fix this bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178753


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 }
   }
 
+  /**
+   * Returns the in memory leader epoch checkpoint by truncating with the 
given start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {

Review Comment:
   Will remove it to avoid confustion.



##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 }
   }
 
+  /**
+   * Returns the in memory leader epoch checkpoint by truncating with the 
given start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {

Review Comment:
   Will remove it to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178160


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -148,6 +151,16 @@ private List 
removeWhileMatching(Iterator 
removeWhileMatching(Iterator

[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   @divijvaidya, Sorry for not being clear and waste your time to try 
understanding this PR. My bad!
   
   The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow 
remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the 
correct leader epoch info for a specific segment. To do that, we need to rely 
on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the 
epoch info. However, we don't really want to truncate the epochs in cache (and 
write to checkpoint file in the end). So, we introduce this 
`InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and 
when we truncate the epoch for RLSM, we can do them in memory without affecting 
the checkpoint file, and without interacting with file system. Does that make 
sense?
   
   I'll update in the PR description, and remove the methods that not used 
currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-03-31 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   @divijvaidya, Sorry for not being clear and waste your time to try 
understanding this PR. My bad!
   
   The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow 
remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the 
correct leader epoch info for a specific segment. To do that, we need to rely 
on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the 
epoch info. However, we don't really want to truncate the epochs in cache (and 
write to checkpoint file in the end). So, we introduce this 
`InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and 
when we truncate the epoch for RLSM, we can do them in memory without affecting 
the checkpoint file, and without interacting with file system. 
   
   I'll update in the PR description, and remove the methods that not used 
currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13484: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance

2023-03-31 Thread via GitHub


dajac opened a new pull request, #13484:
URL: https://github.com/apache/kafka/pull/13484

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13483: MINOR: use readlock for read epochs

2023-03-31 Thread via GitHub


showuon commented on PR #13483:
URL: https://github.com/apache/kafka/pull/13483#issuecomment-1491435742

   @divijvaidya @satishd , a quick fix for the lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13483: MINOR: use readlock for read epochs

2023-03-31 Thread via GitHub


showuon opened a new pull request, #13483:
URL: https://github.com/apache/kafka/pull/13483

   use readlock for epochs reading
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13482: Switch to SplittableRandom in ProducerPerformance utility

2023-03-31 Thread via GitHub


showuon merged PR #13482:
URL: https://github.com/apache/kafka/pull/13482


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13482: Switch to SplittableRandom in ProducerPerformance utility

2023-03-31 Thread via GitHub


showuon commented on PR #13482:
URL: https://github.com/apache/kafka/pull/13482#issuecomment-1491401582

   Failed tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testAddAclsOnPrefixedResource(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testRestartReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 8 and Scala 2.12 / 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator()
   Build / JDK 17 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testAclsFilter(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org