[GitHub] [kafka] junrao commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread
junrao commented on code in PR #13463: URL: https://github.com/apache/kafka/pull/13463#discussion_r1154881426 ## core/src/main/scala/kafka/common/InterBrokerSender.scala: ## @@ -156,6 +163,16 @@ abstract class InterBrokerSendThread( def wakeup(): Unit = networkClient.wakeup() } +abstract class InterBrokerRequestManager() { + + var interBrokerSender: InterBrokerSender = _ Review Comment: Does this need to be volatile? ## core/src/main/scala/kafka/network/NetworkUtils.scala: ## @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network +import kafka.server.KafkaConfig +import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.common.Reconfigurable +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.security.JaasContext +import org.apache.kafka.common.utils.{LogContext, Time} + +import scala.jdk.CollectionConverters._ + +object NetworkUtils { Review Comment: Since we are gradually moving code from core to separate modules in java, could we move this to server-common and write it in java? ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -59,8 +61,9 @@ object TransactionCoordinator { time, metrics) val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ") -val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, - time, logContext) +val txnMarkerChannelManager = new TransactionMarkerChannelManager(config, metadataCache, txnStateManager, + time) +interBrokerSender.addRequestManager(txnMarkerChannelManager) Review Comment: It's a bit inconvenient for very instance of InterBrokerRequestManager to do this. Would it be better to pass interBrokerSender to InterBrokerRequestManager and in the constructor do `interBrokerSender.addRequestManager(this)`? ## core/src/main/scala/kafka/common/InterBrokerSender.scala: ## @@ -45,10 +45,15 @@ abstract class InterBrokerSendThread( private val unsentRequests = new UnsentRequests - def generateRequests(): Iterable[RequestAndCompletionHandler] + private val requestManagers = new ArrayList[InterBrokerRequestManager]() Review Comment: Do we need any synchronization for concurrency and visibility across threads? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13345: KAFKA-13020; Implement reading Snapshot log append timestamp
cmccabe commented on code in PR #13345: URL: https://github.com/apache/kafka/pull/13345#discussion_r1154889074 ## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java: ## @@ -121,9 +122,22 @@ private Optional> nextBatch() { Batch batch = iterator.next(); if (!lastContainedLogTimestamp.isPresent()) { -// The Batch type doesn't support returning control batches. For now lets just use -// the append time of the first batch -lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp()); +// This must be the first batch which is expected to be a control batch with one record for +// the snapshot header. +if (batch.controlRecords().isEmpty()) { +throw new IllegalStateException("First batch is not a control batch with at least one record"); Review Comment: Are you thinking of the 2.8 -> 3.0 compatibility break? I'm not aware of any such break between 3.2 and 3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup
philipnee commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1154887444 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -546,6 +543,26 @@ public ConsumerRecords poll(long timeout) { throw new KafkaException("method not implemented"); } + T tryGetFutureResult( +final Time time, +final WakeupableFuture future, +final Duration timeout) throws ExecutionException, InterruptedException { +Timer timer = time.timer(timeout.toMillis()); +do { +if (future.isDone()) { +return future.get(); +} + +if (this.shouldWakeup.get()) { +this.shouldWakeup.set(false); +future.cancel(true); +throw new WakeupException(); +} +// Maybe Thread.sleep? Review Comment: There must be a better way to do this than... sleep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #13490: KAFKA-14875: Implement wakeup
philipnee opened a new pull request, #13490: URL: https://github.com/apache/kafka/pull/13490 Interrupt the blocking methods and throw WakeupException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14875) Implement Wakeup()
Philip Nee created KAFKA-14875: -- Summary: Implement Wakeup() Key: KAFKA-14875 URL: https://issues.apache.org/jira/browse/KAFKA-14875 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee Implement wakeup() and WakeupException. This would be different to the current implementation because I think we just need to interrupt the blocking futures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13345: KAFKA-13020; Implement reading Snapshot log append timestamp
cmccabe commented on code in PR #13345: URL: https://github.com/apache/kafka/pull/13345#discussion_r1154884580 ## raft/src/main/java/org/apache/kafka/raft/ControlRecord.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.util.Objects; +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.record.ControlRecordType; + +public final class ControlRecord { Review Comment: sure, a follow-up is fine ## raft/src/main/java/org/apache/kafka/raft/internals/ByteBufferSerde.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.nio.ByteBuffer; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.server.common.serialization.RecordSerde; + +public final class ByteBufferSerde implements RecordSerde { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KAFKA-14376-KIP887: Add ConfigProvider to make use of environment variables
OneCricketeer commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1154834846 ## clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashSet; +import java.util.HashMap; + +import static org.apache.kafka.common.config.provider.EnvVarConfigProvider.ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class EnvVarConfigProviderTest { + +private EnvVarConfigProvider envVarConfigProvider = null; +@BeforeEach +public void setup() { +Map testEnvVars = new HashMap() { +{ +put("test_var1", "value1"); +put("secret_var2", "value2"); +put("new_var3", "value3"); +} +}; +envVarConfigProvider = new EnvVarConfigProvider(testEnvVars); +envVarConfigProvider.configure(Collections.singletonMap("", "")); +} + +@Test +void testGetAllEnvVarsNotEmpty() { +ConfigData properties = envVarConfigProvider.get(""); +assertNotEquals(0, properties.data().size()); +} + +@Test +void testGetMultipleKeysAndCompare() { +ConfigData properties = envVarConfigProvider.get(""); +assertNotEquals(0, properties.data().size()); +assertEquals("value1", properties.data().get("test_var1")); +assertEquals("value2", properties.data().get("secret_var2")); +assertEquals("value3", properties.data().get("new_var3")); +} + +@Test +public void testGetOneKeyWithNullPath() { +ConfigData config = envVarConfigProvider.get(null, Collections.singleton("secret_var2")); +Map data = config.data(); + +assertEquals(1, data.size()); +assertEquals("value2", data.get("secret_var2")); +} + +@Test +public void testGetOneKeyWithEmptyPath() { +ConfigData config = envVarConfigProvider.get("", Collections.singleton("test_var1")); +Map data = config.data(); + +assertEquals(1, data.size()); +assertEquals("value1", data.get("test_var1")); +} + +@Test +void testGetWhitelistedEnvVars() { +Set whiteList = new HashSet<>(Arrays.asList("test_var1", "secret_var2")); +Set keys = envVarConfigProvider.get(null, whiteList).data().keySet(); +assertEquals(whiteList, keys); +} +@Test +void testNotNullPathNonEmptyThrowsException() { +assertThrows(ConfigException.class, () -> envVarConfigProvider.get("test-path", Collections.singleton("test_var1"))); +} + +@Test void testRegExpEnvVars() { +Map testConfigMap = Collections.singletonMap(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG, "secret_.*"); +envVarConfigProvider.configure(testConfigMap); + +assertEquals(1, envVarConfigProvider.get(null, Collections.singleton("secret_var2")).data().size()); Review Comment: Does this also pass for `custom_secret_var2`? In other words, I believe this is testing `find(pattern)`, not strict `matches`, which has implicit `^ $` anchors. There seems to be a difference between `asPredicate` and `asMatchPredicate`. https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/regex/Pattern.html#asPredicate() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent opened a new pull request, #13489: KAFKA-14617: Fill broker epochs to the AlterPartitionRequest
CalvinConfluent opened a new pull request, #13489: URL: https://github.com/apache/kafka/pull/13489 As the third part of the [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR), it fills the broker epochs from the Fetch request into the AlterPartitionRequest. Also, before generating the alterPartitionRequest, the partition will check whether the broker epoch from the FetchRequest matches with the broker epoch recorded in the metadata cache. If not, the ISR change will be delayed. https://issues.apache.org/jira/browse/KAFKA-14617 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores
vcrfxia commented on code in PR #13444: URL: https://github.com/apache/kafka/pull/13444#discussion_r1154765044 ## docs/streams/developer-guide/processor-api.html: ## @@ -396,12 +400,68 @@ + +Versioned Key-Value State Stores +Versioned key-value state stores are available since Kafka Streams 3.5. +Rather than storing a single record version (value and timestamp) per key, +versioned state stores may store multiple record versions per key. This +allows versioned state stores to support timestamped retrieval operations +to return the latest record (per key) as of a specified timestamp, which +enables use cases such as temporal stream-table joins. +You can create a persistent, versioned state store by passing a +VersionedBytesStoreSupplier +to the +versionedKeyValueStoreBuilder, +or by implementing your own +VersionedKeyValueStore. +Each versioned store has an associated, fixed-duration history retention +parameter which specifies long old record versions should be kept for. +In particular, a versioned store guarantees to return accurate results for +timestamped retrieval operations where the timestamp being queried is within +history retention of the current observed stream time. +History retention also doubles as its grace period, which determines +how far back in time out-of-order writes to the store will be accepted. A +versioned store will not accept writes (inserts, updates, or deletions) if +the timestamp associated with the write is older than the current observed +stream time by more than the grace period. Stream time in this context is +tracked per-partition, rather than per-key, which means it's important +that grace period (i.e., history retention) be set high enough to +accommodate a record with one key arriving out-of-order relative to a +record for another key. +Because the memory footprint of versioned key-value stores is higher than +that of non-versioned key-value stores, you may want to adjust your +RocksDB memory settings +accordingly. Benchmarking your application with versioned stores is also +advised as performance is expected to be worse than when using non-versioned +stores. +Versioned stores do not support caching or interactive queries at this time. +Also, window stores may not be versioned. +Upgrade note: Versioned state stores are opt-in only; no automatic Review Comment: I've added a section into the upgrade guide which links to the developer guide section. I think this is a good compromise in order to keep the upgrade guide smaller, and to be consistent with the fact that the detailed upgrade note for timestamped key-value stores is also in the developer guide (rather than the upgrade guide). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores
vcrfxia commented on code in PR #13444: URL: https://github.com/apache/kafka/pull/13444#discussion_r1154757216 ## docs/streams/developer-guide/processor-api.html: ## @@ -396,12 +400,68 @@ + +Versioned Key-Value State Stores +Versioned key-value state stores are available since Kafka Streams 3.5. +Rather than storing a single record version (value and timestamp) per key, +versioned state stores may store multiple record versions per key. This +allows versioned state stores to support timestamped retrieval operations +to return the latest record (per key) as of a specified timestamp, which +enables use cases such as temporal stream-table joins. +You can create a persistent, versioned state store by passing a +VersionedBytesStoreSupplier +to the +versionedKeyValueStoreBuilder, +or by implementing your own +VersionedKeyValueStore. +Each versioned store has an associated, fixed-duration history retention +parameter which specifies long old record versions should be kept for. +In particular, a versioned store guarantees to return accurate results for +timestamped retrieval operations where the timestamp being queried is within +history retention of the current observed stream time. +History retention also doubles as its grace period, which determines +how far back in time out-of-order writes to the store will be accepted. A +versioned store will not accept writes (inserts, updates, or deletions) if +the timestamp associated with the write is older than the current observed +stream time by more than the grace period. Stream time in this context is Review Comment: Hm, this looks to be inconsistent throughout the existing codebase since searching for both "stream time" and "stream-time" turn up a bunch of results across docs and also javadocs/code comments. Do we have an agreed upon standard for the repo? Personally I prefer "stream time" but unification is probably more important :) If there's already consensus I can make the updates in this PR. Otherwise, might be best left for a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores
vcrfxia commented on code in PR #13444: URL: https://github.com/apache/kafka/pull/13444#discussion_r1154755917 ## docs/streams/developer-guide/processor-api.html: ## @@ -261,11 +262,13 @@ space. RocksDB settings can be fine-tuned, see RocksDB configuration. -Available store variants: -time window key-value store, session window key-value store. -Use persistentTimestampedKeyValueStore -when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries. -Use persistentTimestampedWindowStore +Available store variants: +versioned key-value store, time window key-value store, session window key-value store. Review Comment: Added timestamped KV, regular window stores (previously only timestamped window stores were represented), and session stores into this section, in order to be comprehensive. LMK if it's too much now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
guozhangwang commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1154750701 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Also cc @lucasbru @mjsax @lihaosky to bring this change to your attention. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
guozhangwang commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1154750701 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Also cc @lucasbru @mjsax @lihaosky to bring to your attention. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
guozhangwang commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1154747980 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { +log.info("Couldn't commit any tasks since a rebalance is in progress"); +} else { +log.info("Committed {} transactions", numCommitted); Review Comment: nit: `Committed the ongoing V2 transaction at the assignment due to newly created active tasks`. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); Review Comment: One caveat for EOS-v2 is that, when we commit, we'd have to make sure we are committing all tasks that have processed any data, but not just the active tasks --- sorry for not making that clear before, since it also bothers me some time to make it straight, and as a result I filed https://issues.apache.org/jira/browse/KAFKA-14847, please feel free to read it in more details. --- in a word, when we are in EOS-v2, each commit has to include everyone even if we only want to commit for a part of that, so we'd better give all the tasks in the `commitTasksAndMaybeUpdateCommittableOffsets` func below. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: This reminds me one thing: we call `onAssignment` first, and then `onPartitionsAssigned` later, and we only set `rebalanceInProgress` to false in the latter func, which means that during `onAssignment` we would always see `rebalanceInProgress == true` which would not allow a commit logically.. I gave some thought about it, and currently the quick (and somewhat dirty..) fix would be to move the `rebalanceInProgress == true` line right before the `createNewTasks` inside the `handleAssignment` here. But we should leave a TODO such that moving one we would only rely on `onAssignment` as the rebalance complete barrier and move others from `onPartitionsAssigned` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1154744670 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +nodesToTransactions.synchronized { + // Check if we have already (either node or individual transaction). Add the Node if it isn't there. + val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, +new TransactionDataAndCallbacks( + new AddPartitionsToTxnTransactionCollection(1), + mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) + + val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + + // Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. + if (currentTransactionData != null) { +if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { + val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() + currentTransactionData.topics().forEach { topic => +topic.partitions().forEach { partition => + topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) +} + } + val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError.toMap) +} else { + // We should never see a request on the same epoch since we haven't finished handling the one in queue + throw new InvalidRecordException("Received a second request from the same connection without finishing the first.") +} + } + currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + wakeup() +} + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + inflightNodes.remove(node) Review Comment: Currently we only have one thread and it might be the case forever, but from this code the threading model is not obvious and it would be useful to have a comment that we don't need synchronization for inflightNodes because inflightNodes is only accessed from methods that are called on the sender's thread. --
[GitHub] [kafka] mumrah closed pull request #13488: MINOR: Fix mock in BrokerMetadataPublisherTest
mumrah closed pull request #13488: MINOR: Fix mock in BrokerMetadataPublisherTest URL: https://github.com/apache/kafka/pull/13488 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test
cmccabe commented on PR #13486: URL: https://github.com/apache/kafka/pull/13486#issuecomment-1492303082 Thanks for the fix, @showuon . I fixed this in a bit of a different way in https://github.com/apache/kafka/pull/13481 , by avoiding the exception rather than suppressing it. I think that's better since we want to know about these failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13481: MINOR: fix BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator
cmccabe commented on PR #13481: URL: https://github.com/apache/kafka/pull/13481#issuecomment-1492298328 committed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13488: MINOR: Fix mock in BrokerMetadataPublisherTest
cmccabe commented on PR #13488: URL: https://github.com/apache/kafka/pull/13488#issuecomment-1492299446 Looks like a duplicate of https://github.com/apache/kafka/pull/13481 . Sorry about the broken stuff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #13481: MINOR: fix BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator
cmccabe closed pull request #13481: MINOR: fix BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator URL: https://github.com/apache/kafka/pull/13481 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread
artemlivshits commented on code in PR #13463: URL: https://github.com/apache/kafka/pull/13463#discussion_r1154150901 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -247,6 +251,11 @@ class BrokerServer( ) alterPartitionManager.start() + val interBrokerSendLogContext = new LogContext(s"[InterBrokerSender broker=${config.brokerId}]") Review Comment: Could probably name interBrokerSenderLogContext, so that "interBrokerSender" is a common string to find all things related to interBrokerSender. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -247,6 +251,11 @@ class BrokerServer( ) alterPartitionManager.start() + val interBrokerSendLogContext = new LogContext(s"[InterBrokerSender broker=${config.brokerId}]") + val networkClient: NetworkClient = NetworkUtils.buildNetworkClient("InterBrokerSendClient", config, metrics, time, interBrokerSendLogContext) Review Comment: I think now the networkClient got a purpose, so we could name it interBrokerNetworkClient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed
divijvaidya commented on PR #13471: URL: https://github.com/apache/kafka/pull/13471#issuecomment-1492282361 @hudeqi please feel free to cherry-pick this commit: https://github.com/divijvaidya/kafka/commit/e6db8eaaeffc3b7a87f34c0e72ff25395fda97a3 into this PR. It introduces the unit test which fails before the `removeMetrics()` and passes after that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #13488: Fix mock in BrokerMetadataPublisherTest
mumrah opened a new pull request, #13488: URL: https://github.com/apache/kafka/pull/13488 https://github.com/apache/kafka/commit/09e59bc7761a6b9ec1437b3decdfcd7b5fff868e added a list of metadata publishers in BrokerServer.scala which broke the mock used in BrokerMetadataPublisherTest#testExceptionInUpdateCoordinator. @showuon suppressed the exception in #13486. This patch fixes the mock -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
philipnee commented on PR #13455: URL: https://github.com/apache/kafka/pull/13455#issuecomment-1492207782 @kirktrue - do you want to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
philipnee commented on PR #13455: URL: https://github.com/apache/kafka/pull/13455#issuecomment-1492189505 Hey I left some comments, but it looks good afterall. @showuon @guozhangwang - Would you guys be interested in reviewing this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
philipnee commented on code in PR #13455: URL: https://github.com/apache/kafka/pull/13455#discussion_r1154648065 ## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ## @@ -92,11 +92,37 @@ public synchronized Set assignment() { return this.subscriptions.assignedPartitions(); } -/** Simulate a rebalance event. */ +/** + * Simulate a rebalance event. + */ public synchronized void rebalance(Collection newAssignment) { -// TODO: Rebalance callbacks +// prepare for rebalance callback Review Comment: This is more like, computing the partitions to be added and removed, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
philipnee commented on code in PR #13455: URL: https://github.com/apache/kafka/pull/13455#discussion_r1154647021 ## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ## @@ -92,11 +92,37 @@ public synchronized Set assignment() { return this.subscriptions.assignedPartitions(); } -/** Simulate a rebalance event. */ +/** + * Simulate a rebalance event. + */ public synchronized void rebalance(Collection newAssignment) { -// TODO: Rebalance callbacks +// prepare for rebalance callback +Set oldAssignmentSet = this.subscriptions.assignedPartitions(); +Set newAssignmentSet = new HashSet<>(newAssignment); +List added = new ArrayList<>(newAssignment.size()); Review Comment: I'm pretty sure you can do set.removeAll(stuff) - saves some code there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on pull request #13364: KAFKA-14491: [16/N] Add recovery logic for store inconsistency due to failed write
vcrfxia commented on PR #13364: URL: https://github.com/apache/kafka/pull/13364#issuecomment-1492185480 The build failed with an error that looks unrelated to this PR: ``` [2023-03-31T00:06:22.083Z] FAILURE: Build failed with an exception. [2023-03-31T00:06:22.083Z] [2023-03-31T00:06:22.083Z] * What went wrong: [2023-03-31T00:06:22.083Z] A problem was found with the configuration of task ':rat' (type 'RatTask'). [2023-03-31T00:06:22.083Z] - Gradle detected a problem with the following location: '/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13364'. [2023-03-31T00:06:22.083Z] [2023-03-31T00:06:22.083Z] Reason: Task ':rat' uses this output of task ':clients:processTestMessages' without declaring an explicit or implicit dependency. This can lead to incorrect results being produced, depending on what order the tasks are executed. ``` @mjsax would you mind triggering a new build in case it's a flake? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #13458: KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka cl…
urbandan commented on PR #13458: URL: https://github.com/apache/kafka/pull/13458#issuecomment-1492165169 @C0urante Thank you for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test
mumrah merged PR #13486: URL: https://github.com/apache/kafka/pull/13486 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test
mumrah commented on PR #13486: URL: https://github.com/apache/kafka/pull/13486#issuecomment-1492109609 Thanks @showuon, this LGTM to unblock trunk. This was introduced in 09e59bc7761a6b9, I'm going to dig into it today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi closed pull request #13485: MINOR:Remove unused metric variable in ReplicaManager
hudeqi closed pull request #13485: MINOR:Remove unused metric variable in ReplicaManager URL: https://github.com/apache/kafka/pull/13485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13485: MINOR:Remove unused metric variable in ReplicaManager
hudeqi commented on PR #13485: URL: https://github.com/apache/kafka/pull/13485#issuecomment-1492080944 It looks like it's used in ControllerIntegrationTest, and I don't know why it can't see the call in IDEA :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14874) Unable to create > 5000 topics for once when using Kraft
Chia-Ping Tsai created KAFKA-14874: -- Summary: Unable to create > 5000 topics for once when using Kraft Key: KAFKA-14874 URL: https://issues.apache.org/jira/browse/KAFKA-14874 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai the error happens due to [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L779] I encountered this error when creating >5000 topics for mirroring the cluster from zk to Kraft. The operation of creating a bunch of topics is allowed by zk-based kafka. It seems to me there are two improvements for this issue. 1) add more precise error message for such case. 2) make `maxRecordsPerBatch` configurable (there is already a setter [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L272]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role
[ https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14838. --- Resolution: Done > MM2 Worker/Connector/Task clients should specify client ID based on flow and > role > - > > Key: KAFKA-14838 > URL: https://issues.apache.org/jira/browse/KAFKA-14838 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > Fix For: 3.5.0 > > > MM2 code creates a lot of Kafka clients internally. These clients generate a > lot of logs, but since the client.id is not properly specified, connecting > the dots between a specific Connector/Task and its internal client is close > to impossible. This is even more complex when MM2 is running in distributed > mode, in which multiple Connect workers are running inside the same process. > For Connector/Task created clients, the client.id clients should specify the > flow, the Connector name/Task ID and the role of the client. E.g. > MirrorSourceConnector uses multiple admin clients, and their client.id should > reflect the difference between them. > For Worker created clients, the client.id should refer to the flow. > This will help log analysis significantly, especially in MM2 mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role
[ https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reopened KAFKA-14838: --- > MM2 Worker/Connector/Task clients should specify client ID based on flow and > role > - > > Key: KAFKA-14838 > URL: https://issues.apache.org/jira/browse/KAFKA-14838 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > Fix For: 3.5.0 > > > MM2 code creates a lot of Kafka clients internally. These clients generate a > lot of logs, but since the client.id is not properly specified, connecting > the dots between a specific Connector/Task and its internal client is close > to impossible. This is even more complex when MM2 is running in distributed > mode, in which multiple Connect workers are running inside the same process. > For Connector/Task created clients, the client.id clients should specify the > flow, the Connector name/Task ID and the role of the client. E.g. > MirrorSourceConnector uses multiple admin clients, and their client.id should > reflect the difference between them. > For Worker created clients, the client.id should refer to the flow. > This will help log analysis significantly, especially in MM2 mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13458: KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka cl…
C0urante merged PR #13458: URL: https://github.com/apache/kafka/pull/13458 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)
rajinisivaram commented on PR #13350: URL: https://github.com/apache/kafka/pull/13350#issuecomment-1491991489 @dajac Thanks for the review, I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)
rajinisivaram commented on code in PR #13350: URL: https://github.com/apache/kafka/pull/13350#discussion_r1154530618 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -574,6 +697,44 @@ private void assignOwnedPartitions() { } } +// Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, +// otherwise, to minQuota +private void assignRackAwareRoundRobin(List unassignedPartitions) { +if (rackInfo.consumerRacks.isEmpty()) +return; +int nextUnfilledConsumerIndex = 0; +Iterator unassignedIter = unassignedPartitions.iterator(); +while (unassignedIter.hasNext()) { +TopicPartition unassignedPartition = unassignedIter.next(); +String consumer = null; +int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex); +if (nextIndex >= 0) { +consumer = unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex); +int assignmentCount = assignment.get(consumer).size() + 1; +if (assignmentCount >= minQuota) { + unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); +if (assignmentCount < maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); +} else { +nextIndex++; +} +nextUnfilledConsumerIndex = unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % unfilledMembersWithUnderMinQuotaPartitions.size(); +} else if (!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) { +int firstIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithExactlyMinQuotaPartitions, 0); +if (firstIndex >= 0) { +consumer = unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex); +if (assignment.get(consumer).size() + 1 == maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex); +} +} +if (consumer == null) +continue; Review Comment: Updated. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -835,7 +1004,7 @@ private List assignOwnedPartitions() { // if this topic partition of this consumer no longer exists, remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); -} else if (!consumerSubscription.topics().contains(partition.topic())) { +} else if (!consumerSubscription.topics().contains(partition.topic()) || rackInfo.racksMismatch(consumer, partition)) { Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14867) Trigger rebalance when replica racks change if client.rack is configured
[ https://issues.apache.org/jira/browse/KAFKA-14867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-14867. Reviewer: David Jacot Resolution: Fixed > Trigger rebalance when replica racks change if client.rack is configured > > > Key: KAFKA-14867 > URL: https://issues.apache.org/jira/browse/KAFKA-14867 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.5.0 > > > To improve locality after reassignments, trigger rebalance from leader if set > of racks of partition replicas change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rajinisivaram merged pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)
rajinisivaram merged PR #13474: URL: https://github.com/apache/kafka/pull/13474 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13434: KAFKA-14785: Connect offset read REST API
C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1154513992 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java: ## @@ -1040,6 +1048,40 @@ public void testGetConnectorTypeWithEmptyConfig() { assertEquals(ConnectorType.UNKNOWN, herder.connectorType(Collections.emptyMap())); } +@Test +public void testGetConnectorOffsetsConnectorNotFound() { Review Comment: This test is failing now (probably because of the shift to use a snapshot in `AbstractHerder::connectorOffsets`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)
rajinisivaram commented on PR #13474: URL: https://github.com/apache/kafka/pull/13474#issuecomment-1491969056 @dajac Thanks for the review, test failures not related, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13434: KAFKA-14785: Connect offset read REST API
C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1154508578 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -866,4 +867,18 @@ public List connectorPluginConfig(String pluginName) { } } +@Override +public void connectorOffsets(String connName, Callback cb) { Review Comment: It's just a preventative measure to keep the threading model simple. Right now (with the exception of parallel start/stop of connectors/tasks) we don't make any concurrent calls to the `Worker` class. It may not be a problem for this specific case but it makes things easier to reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open
[ https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sudesh Wasnik reassigned KAFKA-14091: - Assignee: Sagar Rao (was: Sudesh Wasnik) > Suddenly-killed tasks can leave hanging transactions open > - > > Key: KAFKA-14091 > URL: https://issues.apache.org/jira/browse/KAFKA-14091 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > > Right now, if a task running with exactly-once support is killed > ungracefully, it may leave a hanging transaction open. If the transaction > included writes to the global offsets topic, then startup for future workers > becomes blocked on that transaction expiring. > Ideally, we could identify these kinds of hanging transactions and > proactively abort them. > Unfortunately, there are a few facts that make this fairly complicated: > # Workers read to the end of the offsets topic during startup, before > joining the cluster > # Workers do not know which tasks they are assigned until they join the > cluster > The result of these facts is that we cannot trust workers that are restarted > shortly after being ungracefully shut down to fence out their own hanging > transactions, since any hanging transactions would prevent them from being > able to join the group and receive their task assignment in the first place. > We could possibly accomplish this by having the leader proactively abort any > open transactions for tasks on workers that appear to have left the cluster > during a rebalance. This would not require us to wait for the scheduled > rebalance delay to elapse, since the intent of the delay is to provide a > buffer between when workers leave and when their connectors/tasks are > reallocated across the cluster (and, if the worker is able to rejoin before > that buffer is consumed, then give it back the same connectors/tasks it was > running previously); aborting transactions for tasks on these workers would > not interfere with that goal. > > It's also possible that we may have to handle the case where a > [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287] > task leaves a transaction open; I have yet to confirm whether this is > possible, though. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open
[ https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sudesh Wasnik reassigned KAFKA-14091: - Assignee: Sudesh Wasnik (was: Sagar Rao) > Suddenly-killed tasks can leave hanging transactions open > - > > Key: KAFKA-14091 > URL: https://issues.apache.org/jira/browse/KAFKA-14091 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Sudesh Wasnik >Priority: Major > > Right now, if a task running with exactly-once support is killed > ungracefully, it may leave a hanging transaction open. If the transaction > included writes to the global offsets topic, then startup for future workers > becomes blocked on that transaction expiring. > Ideally, we could identify these kinds of hanging transactions and > proactively abort them. > Unfortunately, there are a few facts that make this fairly complicated: > # Workers read to the end of the offsets topic during startup, before > joining the cluster > # Workers do not know which tasks they are assigned until they join the > cluster > The result of these facts is that we cannot trust workers that are restarted > shortly after being ungracefully shut down to fence out their own hanging > transactions, since any hanging transactions would prevent them from being > able to join the group and receive their task assignment in the first place. > We could possibly accomplish this by having the leader proactively abort any > open transactions for tasks on workers that appear to have left the cluster > during a rebalance. This would not require us to wait for the scheduled > rebalance delay to elapse, since the intent of the delay is to provide a > buffer between when workers leave and when their connectors/tasks are > reallocated across the cluster (and, if the worker is able to rejoin before > that buffer is consumed, then give it back the same connectors/tasks it was > running previously); aborting transactions for tasks on these workers would > not interfere with that goal. > > It's also possible that we may have to handle the case where a > [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287] > task leaves a transaction open; I have yet to confirm whether this is > possible, though. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager
showuon commented on PR #13487: URL: https://github.com/apache/kafka/pull/13487#issuecomment-1491884963 Thanks for the PR. I'll have an early review this weekend. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #13483: MINOR: use readlock for read epochs
satishd merged PR #13483: URL: https://github.com/apache/kafka/pull/13483 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13483: MINOR: use readlock for read epochs
satishd commented on PR #13483: URL: https://github.com/apache/kafka/pull/13483#issuecomment-1491858350 Failed tests are not related to this change, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13483: MINOR: use readlock for read epochs
satishd commented on code in PR #13483: URL: https://github.com/apache/kafka/pull/13483#discussion_r1154414786 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,11 +382,11 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); Review Comment: Nice catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd opened a new pull request, #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager
satishd opened a new pull request, #13487: URL: https://github.com/apache/kafka/pull/13487 **_This PR is NOT YET READY for review. It needs some cleaning of the code and a few more changes._** It is raised to run jenkins jobs with different environments. I will update the description once it is ready for review Added functionality to copy log segments, indexes to the target remote storage for each topic partition enabled with tiered storage. This involves creating scheduled tasks for all leader partition replicas to copy their log segments in sequence to tiered storage.t push -f origin ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)
dajac commented on code in PR #13474: URL: https://github.com/apache/kafka/pull/13474#discussion_r1154396016 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1635,6 +1641,38 @@ public String toString() { return "(version" + version + ": " + partitionsPerTopic + ")"; } } +private static class RackInfo { +private final Set racks; +RackInfo(Optional clientRack, PartitionInfo partition) { +if (clientRack.isPresent() && partition.replicas() != null) { +racks = Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet()); +} else { +racks = Collections.emptySet(); +} +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (!(o instanceof RackInfo)) { +return false; +} +RackInfo rackInfo = (RackInfo) o; +return Objects.equals(racks, rackInfo.racks); +} + +@Override +public int hashCode() { +return Objects.hash(racks); +} + +@Override +public String toString() { +return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks; Review Comment: That makes sense. I did not realize it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)
dajac commented on code in PR #13350: URL: https://github.com/apache/kafka/pull/13350#discussion_r1154372329 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -61,35 +72,51 @@ static final class ConsumerGenerationPair { public static final class MemberData { public final List partitions; public final Optional generation; -public MemberData(List partitions, Optional generation) { +public final Optional rackId; +public MemberData(List partitions, Optional generation, Optional rackId) { Review Comment: small nit: Could we add an empty line before this one? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -835,7 +1004,7 @@ private List assignOwnedPartitions() { // if this topic partition of this consumer no longer exists, remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); -} else if (!consumerSubscription.topics().contains(partition.topic())) { +} else if (!consumerSubscription.topics().contains(partition.topic()) || rackInfo.racksMismatch(consumer, partition)) { Review Comment: nit: Should we update the comment below this line as well? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -574,6 +697,44 @@ private void assignOwnedPartitions() { } } +// Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, +// otherwise, to minQuota +private void assignRackAwareRoundRobin(List unassignedPartitions) { +if (rackInfo.consumerRacks.isEmpty()) +return; +int nextUnfilledConsumerIndex = 0; +Iterator unassignedIter = unassignedPartitions.iterator(); +while (unassignedIter.hasNext()) { +TopicPartition unassignedPartition = unassignedIter.next(); +String consumer = null; +int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex); +if (nextIndex >= 0) { +consumer = unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex); +int assignmentCount = assignment.get(consumer).size() + 1; +if (assignmentCount >= minQuota) { + unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); +if (assignmentCount < maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); +} else { +nextIndex++; +} +nextUnfilledConsumerIndex = unfilledMembersWithUnderMinQuotaPartitions.isEmpty() ? 0 : nextIndex % unfilledMembersWithUnderMinQuotaPartitions.size(); +} else if (!unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) { +int firstIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithExactlyMinQuotaPartitions, 0); +if (firstIndex >= 0) { +consumer = unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex); +if (assignment.get(consumer).size() + 1 == maxQuota) + unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex); +} +} +if (consumer == null) +continue; Review Comment: small nit: It may be better to invert the condition and bring the above lines into the if branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13454: MINOR: move RecordReader from org.apache.kafka.tools to org.apache.co…
chia7712 commented on PR #13454: URL: https://github.com/apache/kafka/pull/13454#issuecomment-1491754313 @ijuma @mimaison Could you take a look? move `RecordReader` to the same package as `MessageFormatter` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed
hudeqi commented on code in PR #13471: URL: https://github.com/apache/kafka/pull/13471#discussion_r1154278469 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4485,6 +4485,25 @@ class ReplicaManagerTest { assertTrue(response.usableBytes >= 0) } } + + @Test + def checkRemoveMetricsCountMatchRegisterCount(): Unit = { Review Comment: > I have an idea with which we can avoid changing the KafkaMetricsGroup. Could you please consider the following. > > You can use mockito's `MockConstruction` to mock the KafkaMetricsGroup and count the number of invocations on that mock and later assert on number of invocations of add and remove. As an example, this test could be written as follows (rough example with Java code): > > ``` > var numAddCount = 0; > var numRemoveCount = 0; > try (MockedConstruction mockMetricsGroup = mockConstruction(KafkaMetricsGroup.class, > (mock, context) -> { >doAnswer(invocation -> { > numAddCount++; > }).when(mock).newGauge(anyString()); > > // similarly add mocks for newMeter etc. > > doAnswer(invocation -> { > numRemoveCount++; > }).when(mock).removeMetric(anyString()); > })) { >val rm = new ReplicaManager( > metrics = metrics, > config = config, > time = time, > scheduler = new MockScheduler(time), > logManager = mockLogMgr, > quotaManagers = quotaManager, > metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), > logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), > alterPartitionManager = alterPartitionManager, > threadNamePrefix = Option(this.getClass.getName)) > >rm.shutdown() >assertEquals(numAddCount, numRemoveCount) > } > ``` Sorry, I tried my best to write this unit test according to this method, but since newGauge has a return value, there is no way to use doAnswer. It seems that KafkaMetricsGroup must be changed? @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
divijvaidya commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154306477 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: Thanks for the explanation @showuon. It's clear now. To rephrase what you mentioned (correct me if I misunderstood) we want to create an in-memory copy of the `LeaderEpochCheckpointCache` so that we can modify it without modifying the original checkpointcache & checkpoint. Is that right? At the end of the day, RLSM requires `Map segmentLeaderEpochs()` and the source is `LeaderEpochCheckpointCache`. I was wondering if it would be possible to copy the `LeaderEpochCheckpointCache` into an intermediate data structure (which doesn't have to be of type LeaderEpochCheckpointCache, hence decoupling it from the need to create a dummy `InMemoryLeaderEpochCheckpoint `) and then using that intermediate data structure to extract the required Map (after whatever manipulation we want to do with it). Similar logic could be employed for the requirement to provide `ByteBuffer leaderEpochIndex` to `LogSegmentData`. Thoughts? Having said that I don't want to block this PR on this refactoring discussion. It can be done separately after this is merged. In the scope of this PR, if you add a comment to InMemoryLeaderEpochCheckpoint which could explain future reader of the code about the purpose of this class (the way you described in the comment above), that would be enough to make me happy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154310401 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,21 +398,25 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); try { return new ArrayList<>(epochs.values()); } finally { -lock.writeLock().unlock(); +lock.readLock().unlock(); } } -private void flush() { +private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, Collection epochEntries) { lock.readLock().lock(); try { -checkpoint.write(epochs.values()); +leaderEpochCheckpoint.write(epochEntries); } finally { lock.readLock().unlock(); } } +private void flush() { +flushTo(this.checkpoint, epochs.values()); Review Comment: No, the 2nd parameter is different. For `in memory` one, we need to "clone" the epochs values to avoid change the inner entries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13486: MINOR: fix broker testExceptionInUpdateCoordinator test
showuon commented on PR #13486: URL: https://github.com/apache/kafka/pull/13486#issuecomment-1491700502 @cmccabe @mumrah @dengziming , call for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13486: MINOR: fix broker testExceptionInUpdateCoordinator test
showuon opened a new pull request, #13486: URL: https://github.com/apache/kafka/pull/13486 After this change: https://github.com/apache/kafka/pull/13462 , the `testExceptionInUpdateCoordinator` failed with ``` java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: Error updating group coordinator with local changes in MetadataDelta up to 9: injected failure at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) at kafka.testkit.KafkaClusterTestKit.waitForAllFutures(KafkaClusterTestKit.java:569) at kafka.testkit.KafkaClusterTestKit.close(KafkaClusterTestKit.java:541) at kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator(BrokerMetadataPublisherTest.scala:270) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: Error updating group coordinator with local changes in MetadataDelta up to 9: injected failure at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at kafka.server.BrokerServer.$anonfun$shutdown$6(BrokerServer.scala:547) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at kafka.server.BrokerServer.shutdown(BrokerServer.scala:547) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: Error updating group coordinator with local changes in MetadataDelta up to 9: injected failure at app//kafka.server.metadata.BrokerMetadataPublisher.$anonfun$onMetadataUpdate$7(BrokerMetadataPublisher.scala:188) at app//scala.Option.foreach(Option.scala:407) at app//kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:174) at app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:298) at app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) ... 1 more Caused by: java.lang.RuntimeException: injected failure at kafka.server.metadata.BrokerMetadataPublisher.$anonfun$onMetadataUpdate$7(BrokerMetadataPublisher.scala:200) at scala.Option.foreach(Option.scala:407) at kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:174) at org.apache.kafka.image.loader.MetadataLoader.lambda$handleCommit$1(MetadataLoader.java:341) ... 4 more ``` So, it failed when we tried to remove and close the `brokerMetadataPublisher`, because the `uninitializedPublishers` has no this publisher when we removing it (deleted [here](https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java#L291-L298)). We should, anyway, not failed when shutdown the broker/controller, so swallow the exceptions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
divijvaidya commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154259339 ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -25,6 +26,7 @@ import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager import org.apache.kafka.server.log.remote.storage._ +import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint Review Comment: Is this still required here? ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,21 +398,25 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); try { return new ArrayList<>(epochs.values()); } finally { -lock.writeLock().unlock(); +lock.readLock().unlock(); } } -private void flush() { +private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, Collection epochEntries) { lock.readLock().lock(); try { -checkpoint.write(epochs.values()); +leaderEpochCheckpoint.write(epochEntries); } finally { lock.readLock().unlock(); } } +private void flush() { +flushTo(this.checkpoint, epochs.values()); Review Comment: both invocations for `flushTo()` have the same second parameter. Could we instead read the entries inside the flushTo method itself? Note that we don't require to acquire locks in flushTo() or in cloneWithLeaderEpochCheckpoint(), since epochEntries() does it for us. ``` private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { leaderEpochCheckpoint.write(epochEntries()); } private void flush() { flushTo(this.checkpoint); } public LeaderEpochFileCache cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) { flushTo(leaderEpochCheckpoint); // create a new cache backed by the provided leaderEpochCheckpoint return new LeaderEpochFileCache(this.topicPartition, leaderEpochCheckpoint); } ``` ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: Thanks for the explanation @showuon. It's clear now. To rephrase what you mentioned (correct me if I misunderstood) we want to create an in-memory copy of the `LeaderEpochCheckpointCache` so that we can modify it without modifying the original checkpointcache & checkpoint. Is that right? At the end of the day, RLSM requires `Map segmentLeaderEpochs()` and the source is `LeaderEpochCheckpointCache`. I was wondering if it would be possible to copy the `LeaderEpochCheckpointCache` into an intermediate data structure (which doesn't have to be of type LeaderEpochCheckpointCache, hence decoupling it from the need to create a dummy `InMemoryLeaderEpochCheckpoint `) and then using that intermediate data structure to extract the required Map (after whatever manipulation we want to do with it). Similar logic could be employed for the requirement to provide `ByteBuffer leaderEpochIndex` to `LogSegmentData`. Thoughts? -- This is an
[GitHub] [kafka] hudeqi opened a new pull request, #13485: MINOR:Remove unused metric variable in ReplicaManager
hudeqi opened a new pull request, #13485: URL: https://github.com/apache/kafka/pull/13485 This metric variable has never been used and has not been used for testing. Should it be deleted? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)
rajinisivaram commented on PR #13474: URL: https://github.com/apache/kafka/pull/13474#issuecomment-1491671237 @dajac Thanks for the review. Addressed comments and left one question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)
rajinisivaram commented on code in PR #13474: URL: https://github.com/apache/kafka/pull/13474#discussion_r1154285495 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1635,6 +1641,38 @@ public String toString() { return "(version" + version + ": " + partitionsPerTopic + ")"; } } +private static class RackInfo { +private final Set racks; +RackInfo(Optional clientRack, PartitionInfo partition) { +if (clientRack.isPresent() && partition.replicas() != null) { +racks = Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet()); +} else { +racks = Collections.emptySet(); +} +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (!(o instanceof RackInfo)) { +return false; +} +RackInfo rackInfo = (RackInfo) o; +return Objects.equals(racks, rackInfo.racks); +} + +@Override +public int hashCode() { +return Objects.hash(racks); +} + +@Override +public String toString() { +return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks; Review Comment: This appears in MetadataSnapshot as an ordered list by partition, so that may be sufficient? To add partition index here, we would need to store partition index in RackInfo as well, which seemed too much just for toString. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13474: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881)
rajinisivaram commented on code in PR #13474: URL: https://github.com/apache/kafka/pull/13474#discussion_r1154282863 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1613,14 +1615,18 @@ private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { private static class MetadataSnapshot { private final int version; -private final Map partitionsPerTopic; +private final Map> partitionsPerTopic; Review Comment: Since we already have another PartitionInfo, I renamed this one `PartitionRackInfo`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed
hudeqi commented on code in PR #13471: URL: https://github.com/apache/kafka/pull/13471#discussion_r1154278469 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4485,6 +4485,25 @@ class ReplicaManagerTest { assertTrue(response.usableBytes >= 0) } } + + @Test + def checkRemoveMetricsCountMatchRegisterCount(): Unit = { Review Comment: > I have an idea with which we can avoid changing the KafkaMetricsGroup. Could you please consider the following. > > You can use mockito's `MockConstruction` to mock the KafkaMetricsGroup and count the number of invocations on that mock and later assert on number of invocations of add and remove. As an example, this test could be written as follows (rough example with Java code): > > ``` > var numAddCount = 0; > var numRemoveCount = 0; > try (MockedConstruction mockMetricsGroup = mockConstruction(KafkaMetricsGroup.class, > (mock, context) -> { >doAnswer(invocation -> { > numAddCount++; > }).when(mock).newGauge(anyString()); > > // similarly add mocks for newMeter etc. > > doAnswer(invocation -> { > numRemoveCount++; > }).when(mock).removeMetric(anyString()); > })) { >val rm = new ReplicaManager( > metrics = metrics, > config = config, > time = time, > scheduler = new MockScheduler(time), > logManager = mockLogMgr, > quotaManagers = quotaManager, > metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), > logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), > alterPartitionManager = alterPartitionManager, > threadNamePrefix = Option(this.getClass.getName)) > >rm.shutdown() >assertEquals(numAddCount, numRemoveCount) > } > ``` Sorry, I tried my best to write this unit test according to this method, but since newGauge has a return value, there is no way to use doAnswer. It seems that KafkaMetricsGroup must be changed? @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed
hudeqi commented on code in PR #13471: URL: https://github.com/apache/kafka/pull/13471#discussion_r1154277604 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4485,6 +4485,25 @@ class ReplicaManagerTest { assertTrue(response.usableBytes >= 0) } } + + @Test + def checkRemoveMetricsCountMatchRegisterCount(): Unit = { Review Comment: > > I have an idea with which we can avoid changing the KafkaMetricsGroup. Could you please consider the following. > > You can use mockito's `MockConstruction` to mock the KafkaMetricsGroup and count the number of invocations on that mock and later assert on number of invocations of add and remove. As an example, this test could be written as follows (rough example with Java code): > > ``` > var numAddCount = 0; > var numRemoveCount = 0; > try (MockedConstruction mockMetricsGroup = mockConstruction(KafkaMetricsGroup.class, > (mock, context) -> { >doAnswer(invocation -> { > numAddCount++; > }).when(mock).newGauge(anyString()); > > // similarly add mocks for newMeter etc. > > doAnswer(invocation -> { > numRemoveCount++; > }).when(mock).removeMetric(anyString()); > })) { >val rm = new ReplicaManager( > metrics = metrics, > config = config, > time = time, > scheduler = new MockScheduler(time), > logManager = mockLogMgr, > quotaManagers = quotaManager, > metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), > logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), > alterPartitionManager = alterPartitionManager, > threadNamePrefix = Option(this.getClass.getName)) > >rm.shutdown() >assertEquals(numAddCount, numRemoveCount) > } > ``` Sorry, I tried my best to write a single test according to this method, but since newGauge has a return value, there is no way to use doAnswer. It seems that KafkaMetricsGroup must be changed? @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.
dajac merged PR #13408: URL: https://github.com/apache/kafka/pull/13408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.
dajac commented on PR #13408: URL: https://github.com/apache/kafka/pull/13408#issuecomment-1491621526 Failed tests are not related. Merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path
[ https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707213#comment-17707213 ] Bert Roos commented on KAFKA-6007: -- [~gharris1727] I created a reproduction, but in the end, it showed this issue is resolved. You find the reproduction here: https://github.com/Bert-R/kafka-6007-repro. Apparently, something else was preventing my SMT to load. Based on that, I believe this issue can be set to Resolved. > Connect can't validate against transforms in plugins.path > - > > Key: KAFKA-6007 > URL: https://issues.apache.org/jira/browse/KAFKA-6007 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0 >Reporter: Stephane Maarek >Assignee: Konstantine Karantasis >Priority: Major > > Kafka Connect can't validate a custom transformation if placed in plugins > path. > Here's the output I get on the validate call: > {code:java} > Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for > configuration transforms.Flat.type: Class > com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found. > Invalid value null for configuration transforms.Flat.type: Not a > Transformation > "recommended_values": [ > "com.mycorp.kafka.transforms.Flatten$Key", > "com.mycorp.kafka.transforms.Flatten$Value", > "com.mycorp.kafka.transforms.impl.FlattenSinkRecord", > "org.apache.kafka.connect.transforms.Cast$Key", > "org.apache.kafka.connect.transforms.Cast$Value", > "org.apache.kafka.connect.transforms.ExtractField$Key", > "org.apache.kafka.connect.transforms.ExtractField$Value", > "org.apache.kafka.connect.transforms.Flatten$Key", > "org.apache.kafka.connect.transforms.Flatten$Value", > "org.apache.kafka.connect.transforms.HoistField$Key", > "org.apache.kafka.connect.transforms.HoistField$Value", > "org.apache.kafka.connect.transforms.InsertField$Key", > "org.apache.kafka.connect.transforms.InsertField$Value", > "org.apache.kafka.connect.transforms.MaskField$Key", > "org.apache.kafka.connect.transforms.MaskField$Value", > "org.apache.kafka.connect.transforms.RegexRouter", > "org.apache.kafka.connect.transforms.ReplaceField$Key", > "org.apache.kafka.connect.transforms.ReplaceField$Value", > "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", > "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", > "org.apache.kafka.connect.transforms.TimestampConverter$Key", > "org.apache.kafka.connect.transforms.TimestampConverter$Value", > "org.apache.kafka.connect.transforms.TimestampRouter", > "org.apache.kafka.connect.transforms.ValueToKey"], > {code} > As you can see the class appear in the recommended values (!) but can't be > picked up on the validate call. > I believe it's because the recommender implements class discovery using > plugins: > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194 > But the class inference itself doesn't: > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199 > (I'm not an expert in class loading though, just a guess... Unsure how to fix) > A quick fix is to add the transformations in the ClassPath itself, but that > defeats the point a bit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver
dajac commented on code in PR #13432: URL: https://github.com/apache/kafka/pull/13432#discussion_r1154222435 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java: ## @@ -260,12 +261,18 @@ public void onFailure( .filter(future.lookupKeys()::contains) .collect(Collectors.toSet()); retryLookup(keysToUnmap); +} else if (t instanceof UnsupportedVersionException) { +Map unrecoverableFailures = +handler.handleUnsupportedVersionException( +(UnsupportedVersionException) t, +spec.keys, +spec.scope instanceof FulfillmentScope); Review Comment: If the scope is NOT a FulfillmentScope, I think that we don't have to `completeExceptionally` but rather to `retryLookup` like we do in the previous branch. Am I wrong? If this is true, it may be better to completely separate the two cases. For instance, we could also add `handleUnsupportedVersionException` to the lookup strategy in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13458: KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka cl…
urbandan commented on code in PR #13458: URL: https://github.com/apache/kafka/pull/13458#discussion_r1154222170 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ## @@ -200,6 +200,8 @@ public Map workerConfig(SourceAndTarget sourceAndTarget) { // fill in reasonable defaults props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2"); +String groupId = props.get(GROUP_ID_CONFIG); +props.putIfAbsent(CommonClientConfigs.CLIENT_ID_CONFIG, groupId + "|" + sourceAndTarget); Review Comment: Thanks for the clarification - not going to add the group ID here, but I will still try to set it based on the flow. In MM2, the group ID only contains the source alias, with specific setups (e.g. star architecture) the group ID is not unique in a single process. DistributedHerder logs do contain the client ID, and I found it useful in many situations (e.g. tracking rebalances/assignments of a specific flow). Thanks for the hint about the shared admin, will fix that, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154193013 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -383,11 +396,11 @@ public void clear() { // Visible for testing public List epochEntries() { -lock.writeLock().lock(); +lock.readLock().lock(); Review Comment: Opened another PR https://github.com/apache/kafka/pull/13483 to fix this bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178753 ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, } } + /** + * Returns the in memory leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * + * @param log The actual log from where to take the leader-epoch checkpoint + * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + *If start offset is 6, then it will retain an entry at offset 6. + * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + *If end offset is 100, then it will remove the entries greater than or equal to 100. + * @return the truncated leader epoch checkpoint + */ + private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = { Review Comment: Will remove it to avoid confustion. ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -219,6 +221,29 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, } } + /** + * Returns the in memory leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * + * @param log The actual log from where to take the leader-epoch checkpoint + * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + *If start offset is 6, then it will retain an entry at offset 6. + * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + *If end offset is 100, then it will remove the entries greater than or equal to 100. + * @return the truncated leader epoch checkpoint + */ + private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = { Review Comment: Will remove it to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154178160 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -148,6 +151,16 @@ private List removeWhileMatching(Iterator removeWhileMatching(Iterator
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: @divijvaidya, Sorry for not being clear and waste your time to try understanding this PR. My bad! The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the correct leader epoch info for a specific segment. To do that, we need to rely on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache (and write to checkpoint file in the end). So, we introduce this `InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system. Does that make sense? I'll update in the PR description, and remove the methods that not used currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
showuon commented on code in PR #13456: URL: https://github.com/apache/kafka/pull/13456#discussion_r1154172866 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory + */ +public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { Review Comment: @divijvaidya, Sorry for not being clear and waste your time to try understanding this PR. My bad! The motivation for introducing `InMemoryLeaderEpochCheckpoint` is to allow remote log manager to create the `RemoteLogSegmentMetadata`(RLSM) with the correct leader epoch info for a specific segment. To do that, we need to rely on the `LeaderEpochCheckpointCache` to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache (and write to checkpoint file in the end). So, we introduce this `InMemoryLeaderEpochCheckpoint` to feed into `LeaderEpochCheckpointCache`, and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system. I'll update in the PR description, and remove the methods that not used currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13484: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance
dajac opened a new pull request, #13484: URL: https://github.com/apache/kafka/pull/13484 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13483: MINOR: use readlock for read epochs
showuon commented on PR #13483: URL: https://github.com/apache/kafka/pull/13483#issuecomment-1491435742 @divijvaidya @satishd , a quick fix for the lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13483: MINOR: use readlock for read epochs
showuon opened a new pull request, #13483: URL: https://github.com/apache/kafka/pull/13483 use readlock for epochs reading ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13482: Switch to SplittableRandom in ProducerPerformance utility
showuon merged PR #13482: URL: https://github.com/apache/kafka/pull/13482 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13482: Switch to SplittableRandom in ProducerPerformance utility
showuon commented on PR #13482: URL: https://github.com/apache/kafka/pull/13482#issuecomment-1491401582 Failed tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testAddAclsOnPrefixedResource(String).quorum=kraft Build / JDK 11 and Scala 2.13 / kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator() Build / JDK 11 and Scala 2.13 / kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testRestartReplication() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 8 and Scala 2.12 / kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator() Build / JDK 8 and Scala 2.12 / kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator() Build / JDK 17 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testAclsFilter(String).quorum=kraft Build / JDK 17 and Scala 2.13 / kafka.server.metadata.BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org