Re: [PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]
clolov commented on PR #15131: URL: https://github.com/apache/kafka/pull/15131#issuecomment-1878268696 Heya @showuon and thanks for the review! I will make the changes in a couple of hours -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
VedarthConfluent commented on code in PR #15048: URL: https://github.com/apache/kafka/pull/15048#discussion_r1442527460 ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.docker + +import kafka.tools.StorageTool +import kafka.utils.Exit + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption} + +object KafkaDockerWrapper { + def main(args: Array[String]): Unit = { +if (args.length == 0) { + throw new RuntimeException(s"Error: No operation input provided. " + +s"Please provide a valid operation: 'setup'.") +} +val operation = args.head +val arguments = args.tail + +operation match { + case "setup" => +if (arguments.length != 3) { + val errMsg = "not enough arguments passed. Usage: " + +"setup , " + System.err.println(errMsg) + Exit.exit(1, Some(errMsg)) +} +val defaultConfigsDir = arguments(0) +val mountedConfigsDir = arguments(1) +val finalConfigsDir = arguments(2) +try { + prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir) +} catch { + case e: Throwable => +val errMsg = s"error while preparing configs: ${e.getMessage}" +System.err.println(errMsg) +Exit.exit(1, Some(errMsg)) +} + +val formatCmd = formatStorageCmd(finalConfigsDir, envVars) +StorageTool.main(formatCmd) + case _ => +throw new RuntimeException(s"Unknown operation $operation. " + + s"Please provide a valid operation: 'setup'.") +} + } + + import Constants._ + + private def formatStorageCmd(configsDir: String, env: Map[String, String]): Array[String] = { Review Comment: The way storage tool is written, we will need the logic that's present in the main function. We can create a new function that contains entirety of the main function logic and call that from both storage tool main and our code. But that will be kind of same as the present solution, but with just one extra method. Proper solution will require thorough refactoring of the storage tool, which I think is out of scope for this change. Please let us know your thoughts on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
VedarthConfluent commented on code in PR #15048: URL: https://github.com/apache/kafka/pull/15048#discussion_r1442527460 ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.docker + +import kafka.tools.StorageTool +import kafka.utils.Exit + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption} + +object KafkaDockerWrapper { + def main(args: Array[String]): Unit = { +if (args.length == 0) { + throw new RuntimeException(s"Error: No operation input provided. " + +s"Please provide a valid operation: 'setup'.") +} +val operation = args.head +val arguments = args.tail + +operation match { + case "setup" => +if (arguments.length != 3) { + val errMsg = "not enough arguments passed. Usage: " + +"setup , " + System.err.println(errMsg) + Exit.exit(1, Some(errMsg)) +} +val defaultConfigsDir = arguments(0) +val mountedConfigsDir = arguments(1) +val finalConfigsDir = arguments(2) +try { + prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir) +} catch { + case e: Throwable => +val errMsg = s"error while preparing configs: ${e.getMessage}" +System.err.println(errMsg) +Exit.exit(1, Some(errMsg)) +} + +val formatCmd = formatStorageCmd(finalConfigsDir, envVars) +StorageTool.main(formatCmd) + case _ => +throw new RuntimeException(s"Unknown operation $operation. " + + s"Please provide a valid operation: 'setup'.") +} + } + + import Constants._ + + private def formatStorageCmd(configsDir: String, env: Map[String, String]): Array[String] = { Review Comment: The way storage tool is written, we will need the logic that's present in the main function. We can create a new function hat contains entirety of the main function logic and call that from both storage tool main and our code. But that will be kind of same as the present solution, but with just one extra method. Proper solution will require thorough refactoring of the storage tool, which I think is out of scope for this change. Please let us know your thoughts on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803405#comment-17803405 ] Phuc Hong Tran commented on KAFKA-15341: [~divijvaidya], broker feature is just the IBP version, so we just need to specify the version that support tiered storage, and fence brober accordingly (I think the quorum already has the functionality to block out broker which doesn't satisfy cluster's IBP). The other part is that we need to update broker registration request data to include tiered storage enable status, which may require a KIP. What do you think? > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Major > Labels: KIP-405 > Fix For: 3.8.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]
showuon commented on PR #15132: URL: https://github.com/apache/kafka/pull/15132#issuecomment-1878091765 @divijvaidya , this should be the last fix for kafkaApisTest. (Hope so!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]
showuon commented on code in PR #15132: URL: https://github.com/apache/kafka/pull/15132#discussion_r1442473158 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2061,20 +2071,24 @@ class KafkaApisTest extends Logging { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED))) - kafkaApis = createKafkaApis() - kafkaApis.handleInitProducerIdRequest(request, requestLocal) - - verify(requestChannel).sendResponse( -ArgumentMatchers.eq(request), -capturedResponse.capture(), -ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + val kafkaApis = createKafkaApis() Review Comment: This instance is created inside a for loop. We need to close it 1 by 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]
showuon commented on code in PR #15132: URL: https://github.com/apache/kafka/pull/15132#discussion_r1442472935 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -1669,12 +1671,16 @@ class KafkaApisTest extends Logging { val request = buildRequest(offsetCommitRequest) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis() - kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[OffsetCommitResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, - Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode)) + val kafkaApis = createKafkaApis() Review Comment: This kafkaApis instance is created inside a sub-function. We need to close it 1 by 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16059: close more kafkaApis instances [kafka]
showuon opened a new pull request, #15132: URL: https://github.com/apache/kafka/pull/15132 I thought we've closed all kafkaApis instances in KafkaApisTest, but there are more... ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]
hachikuji commented on code in PR #15119: URL: https://github.com/apache/kafka/pull/15119#discussion_r1442435868 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.fault.FaultHandler; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +/** + * A single-threaded driver for {@link KafkaRaftClient}. + * + * @param See {@link KafkaRaftClient} + */ +public class KafkaRaftClientDriver extends ShutdownableThread { +private final Logger log; +private final KafkaRaftClient client; +private final FaultHandler fatalFaultHandler; + +public KafkaRaftClientDriver( +KafkaRaftClient client, +String threadNamePrefix, +FaultHandler fatalFaultHandler, +LogContext logContext +) { +super(threadNamePrefix + "-io-thread", false); +this.client = client; +this.fatalFaultHandler = fatalFaultHandler; +this.log = logContext.logger(KafkaRaftClientDriver.class); +} + +@Override +public void doWork() { +try { +client.poll(); +} catch (Throwable t) { +throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t); +} +} + +@Override +public boolean initiateShutdown() { +if (super.initiateShutdown()) { +client.shutdown(5000).whenComplete((na, exception) -> { +if (exception != null) { +log.error("Graceful shutdown of RaftClient failed", exception); +} else { +log.info("Completed graceful shutdown of RaftClient"); +} +}); +return true; +} else { +return false; +} +} + +/** + * Shutdown the thread. In addition to stopping any utilized threads, this will + * close the {@link KafkaRaftClient} instance. + */ +@Override +public void shutdown() throws InterruptedException { +try { +super.shutdown(); +} finally { +client.close(); Review Comment: I added some comments in the latest commit. Let me know if they address the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]
hachikuji commented on code in PR #15119: URL: https://github.com/apache/kafka/pull/15119#discussion_r1442426943 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.fault.FaultHandler; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +/** + * A single-threaded driver for {@link KafkaRaftClient}. + * + * @param See {@link KafkaRaftClient} + */ +public class KafkaRaftClientDriver extends ShutdownableThread { +private final Logger log; +private final KafkaRaftClient client; +private final FaultHandler fatalFaultHandler; + +public KafkaRaftClientDriver( +KafkaRaftClient client, +String threadNamePrefix, +FaultHandler fatalFaultHandler, +LogContext logContext +) { +super(threadNamePrefix + "-io-thread", false); +this.client = client; +this.fatalFaultHandler = fatalFaultHandler; +this.log = logContext.logger(KafkaRaftClientDriver.class); +} + +@Override +public void doWork() { +try { +client.poll(); +} catch (Throwable t) { +throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t); +} +} + +@Override +public boolean initiateShutdown() { +if (super.initiateShutdown()) { +client.shutdown(5000).whenComplete((na, exception) -> { +if (exception != null) { +log.error("Graceful shutdown of RaftClient failed", exception); +} else { +log.info("Completed graceful shutdown of RaftClient"); +} +}); +return true; +} else { +return false; +} +} + +/** + * Shutdown the thread. In addition to stopping any utilized threads, this will + * close the {@link KafkaRaftClient} instance. + */ +@Override +public void shutdown() throws InterruptedException { +try { +super.shutdown(); +} finally { +client.close(); Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16079) Fix leak in LocalLeaderEndPointTest/FinalizedFeatureChangeListenerTest/KafkaApisTest/ReplicaManagerConcurrencyTest
[ https://issues.apache.org/jira/browse/KAFKA-16079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16079. --- Fix Version/s: 3.7.0 Resolution: Fixed > Fix leak in > LocalLeaderEndPointTest/FinalizedFeatureChangeListenerTest/KafkaApisTest/ReplicaManagerConcurrencyTest > -- > > Key: KAFKA-16079 > URL: https://issues.apache.org/jira/browse/KAFKA-16079 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.7.0 > > > Fix leak in > LocalLeaderEndPointTest/FinalizedFeatureChangeListenerTest/KafkaApisTest/ReplicaManagerConcurrencyTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16079: fix threads leak threads in LocalLeaderEndPointTest and other tests [kafka]
showuon merged PR #15122: URL: https://github.com/apache/kafka/pull/15122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16079: fix threads leak threads in LocalLeaderEndPointTest and other tests [kafka]
showuon commented on PR #15122: URL: https://github.com/apache/kafka/pull/15122#issuecomment-1877995119 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]
jsancio commented on code in PR #15119: URL: https://github.com/apache/kafka/pull/15119#discussion_r1442414085 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.fault.FaultHandler; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +/** + * A single-threaded driver for {@link KafkaRaftClient}. + * + * @param See {@link KafkaRaftClient} + */ +public class KafkaRaftClientDriver extends ShutdownableThread { +private final Logger log; +private final KafkaRaftClient client; +private final FaultHandler fatalFaultHandler; + +public KafkaRaftClientDriver( +KafkaRaftClient client, +String threadNamePrefix, +FaultHandler fatalFaultHandler, +LogContext logContext +) { +super(threadNamePrefix + "-io-thread", false); +this.client = client; +this.fatalFaultHandler = fatalFaultHandler; +this.log = logContext.logger(KafkaRaftClientDriver.class); +} + +@Override +public void doWork() { +try { +client.poll(); +} catch (Throwable t) { +throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t); +} +} + +@Override +public boolean initiateShutdown() { +if (super.initiateShutdown()) { +client.shutdown(5000).whenComplete((na, exception) -> { +if (exception != null) { +log.error("Graceful shutdown of RaftClient failed", exception); +} else { +log.info("Completed graceful shutdown of RaftClient"); +} +}); +return true; +} else { +return false; +} +} + +/** + * Shutdown the thread. In addition to stopping any utilized threads, this will + * close the {@link KafkaRaftClient} instance. + */ +@Override +public void shutdown() throws InterruptedException { +try { +super.shutdown(); +} finally { +client.close(); Review Comment: Okay. How about documenting this decision in the type's Java Doc and the private field `KafkaRaftClient`? That way future readers understand that this is the exception and not the rule. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1442403275 ## gradle/spotbugs-exclude.xml: ## @@ -162,8 +162,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read on the compiler to fail if the code is changed to call a method that throws Exception. Given that, this bug pattern doesn't make sense for Scala code. --> - - + Review Comment: Right, it is not needed, made the respective changes in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1440213705 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging { def deleteProducerSnapshots(): Unit = { LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, -s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") { +s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir", { snapshotsToDelete.foreach { snapshot => snapshot.deleteIfExists() } - } + return; Review Comment: It was expecting a return for inline declaration, it is throwing a type mismatch error without that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402745 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging { def deleteProducerSnapshots(): Unit = { LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, -s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") { +s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir", { snapshotsToDelete.foreach { snapshot => snapshot.deleteIfExists() } - } + return; Review Comment: It was expecting a return for inline declaration, it is throwing a type mismatch error without that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402573 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, * (if there is one). It returns true iff the segment is deletable. * @return the segments ready to be deleted */ - private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { -def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], upperBoundOffset: Long): Boolean = { - val allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset + private[log] def deletableSegments(predicate: (LogSegment, Optional[LogSegment]) => Boolean): Iterable[LogSegment] = { Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, * (if there is one). It returns true iff the segment is deletable. * @return the segments ready to be deleted */ - private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { -def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], upperBoundOffset: Long): Boolean = { - val allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset + private[log] def deletableSegments(predicate: (LogSegment, Optional[LogSegment]) => Boolean): Iterable[LogSegment] = { Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402667 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } localLog.checkIfMemoryMappedBufferClosed() // remove the segments for lookups -localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) +localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason) Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } localLog.checkIfMemoryMappedBufferClosed() // remove the segments for lookups -localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) +localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason) Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402484 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java: ## @@ -0,0 +1,1146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.FileLogInputStream; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.Scheduler; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset. + * New log segments are created according to a configurable policy that controls the size in bytes or time interval + * for a given segment. + * + * NOTE: this class is not thread-safe. + */ +public class LocalLog { + +/** + * a file that is scheduled to be deleted + */ +public static final String DELETED_FILE_SUFFIX = LogFileUtils.DELETED_FILE_SUFFIX; + +/** + * A temporary file that is being used for log cleaning + */ +public static final String CLEANED_FILE_SUFFIX = ".cleaned"; + +/** + * A temporary file used when swapping files into the log + */ +public static final String SWAP_FILE_SUFFIX = ".swap"; + +/** + * a directory that is scheduled to be deleted + */ +public static final String DELETE_DIR_SUFFIX = "-delete"; + +/** + * a directory that is used for future partition + */ +public static final String FUTURE_DIR_SUFFIX = "-future"; +public static final String STRAY_DIR_SUFFIX = "-stray"; + +public static final Pattern DELETE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX); +public static final Pattern FUTURE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX); +public static final Pattern STRAY_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX); + +public static final long UNKNOWN_OFFSET = -1L; + +private final Logger logger; + +private final LogSegments segments; +private final Scheduler scheduler; +private final Time time; +private final TopicPartition topicPartition; +private final LogDirFailureChannel logDirFailureChannel; + +// Last time the log was flushed +private final AtomicLong lastFlushedTime; + +private volatile File dir; +private volatile LogConfig config; +private volatile long recoveryPoint; +private volatile LogOffsetMetadata nextOffsetMetadata; + +// Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks +private volatile String parentDir; +// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() +// After memory mapped buffer is closed, no disk IO operation should be performed for this log. +private volatile boolean isMemoryMappedBufferClosed = false; + +/** + * Creates a new LocalLog instance. +
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402411 ## checkstyle/import-control-core.xml: ## @@ -37,6 +37,8 @@ + + Review Comment: Right, it is not needed. I guess it was needed when that class was kept inside the core module. ## checkstyle/import-control-core.xml: ## @@ -37,6 +37,8 @@ + + Review Comment: Right, it is not needed. I guess it was needed when that class was kept inside the core module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-16073: --- Fix Version/s: 3.8.0 > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1, 3.8.0 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]
hachikuji commented on code in PR #15119: URL: https://github.com/apache/kafka/pull/15119#discussion_r1442350821 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.fault.FaultHandler; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +/** + * A single-threaded driver for {@link KafkaRaftClient}. + * + * @param See {@link KafkaRaftClient} + */ +public class KafkaRaftClientDriver extends ShutdownableThread { +private final Logger log; +private final KafkaRaftClient client; +private final FaultHandler fatalFaultHandler; + +public KafkaRaftClientDriver( +KafkaRaftClient client, +String threadNamePrefix, +FaultHandler fatalFaultHandler, +LogContext logContext +) { +super(threadNamePrefix + "-io-thread", false); +this.client = client; +this.fatalFaultHandler = fatalFaultHandler; +this.log = logContext.logger(KafkaRaftClientDriver.class); +} + +@Override +public void doWork() { +try { +client.poll(); +} catch (Throwable t) { +throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t); +} +} + +@Override +public boolean initiateShutdown() { +if (super.initiateShutdown()) { +client.shutdown(5000).whenComplete((na, exception) -> { +if (exception != null) { +log.error("Graceful shutdown of RaftClient failed", exception); +} else { +log.info("Completed graceful shutdown of RaftClient"); +} +}); +return true; +} else { +return false; +} +} + +/** + * Shutdown the thread. In addition to stopping any utilized threads, this will + * close the {@link KafkaRaftClient} instance. + */ +@Override +public void shutdown() throws InterruptedException { +try { +super.shutdown(); +} finally { +client.close(); Review Comment: Yes, I debated it. It is nice to have the driver own shutdown since it is in control of the poll loop. Given that, it seemed safer for it to also close the client once it knows that shutdown is complete. In a future patch, my thought was to have the driver own creation of the client as well. We can replace `RaftManager` with a builder which constructs the client in the context of the driver. That would establish clear ownership and resolve the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]
ijuma commented on code in PR #15119: URL: https://github.com/apache/kafka/pull/15119#discussion_r1440877890 ## checkstyle/import-control.xml: ## @@ -443,6 +443,7 @@ + Review Comment: Should this be a line above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]
jsancio commented on code in PR #15119: URL: https://github.com/apache/kafka/pull/15119#discussion_r1442310315 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.fault.FaultHandler; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +/** + * A single-threaded driver for {@link KafkaRaftClient}. + * + * @param See {@link KafkaRaftClient} + */ +public class KafkaRaftClientDriver extends ShutdownableThread { +private final Logger log; +private final KafkaRaftClient client; +private final FaultHandler fatalFaultHandler; + +public KafkaRaftClientDriver( +KafkaRaftClient client, +String threadNamePrefix, +FaultHandler fatalFaultHandler, +LogContext logContext +) { +super(threadNamePrefix + "-io-thread", false); +this.client = client; +this.fatalFaultHandler = fatalFaultHandler; +this.log = logContext.logger(KafkaRaftClientDriver.class); +} + +@Override +public void doWork() { +try { +client.poll(); +} catch (Throwable t) { +throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t); +} +} + +@Override +public boolean initiateShutdown() { +if (super.initiateShutdown()) { +client.shutdown(5000).whenComplete((na, exception) -> { +if (exception != null) { +log.error("Graceful shutdown of RaftClient failed", exception); +} else { +log.info("Completed graceful shutdown of RaftClient"); +} +}); +return true; +} else { +return false; +} +} + +/** + * Shutdown the thread. In addition to stopping any utilized threads, this will + * close the {@link KafkaRaftClient} instance. + */ +@Override +public void shutdown() throws InterruptedException { +try { +super.shutdown(); +} finally { +client.close(); +} +} + +@Override +public boolean isRunning() { +return client.isRunning() && !isThreadFailed(); +} + +public CompletableFuture handleRequest( +RequestHeader header, +ApiMessage request, +long createdTimeMs +) { +RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound( +header.correlationId(), +request, +createdTimeMs +); + +client.handle(inboundRequest); + +return inboundRequest.completion.thenApply(response -> response.data); Review Comment: How about: ```java return inboundRequest.completion.thenApply(RaftMessage::data)); ``` ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientDriverTest.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.fault.MockFaultHandler; +import org.junit.jupiter.api.T
Re: [PR] MINOR: Fixing shadow jar publish [kafka]
apoorvmittal10 commented on PR #15127: URL: https://github.com/apache/kafka/pull/15127#issuecomment-1877809786 > I tested the publish again and confirmed it works! Awesome! > Who is a good person to ask about this? Maybe some of the original reviewers of the shadow PR? May be @ijuma or @xvrl can help here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
mumrah commented on code in PR #15115: URL: https://github.com/apache/kafka/pull/15115#discussion_r1442270635 ## .github/workflows/pr_reviews.yml: ## @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Adding Reviewers + +on: + pull_request_review: +types: [submitted] + +jobs: + add_reviewers: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Add Reviewers + run: | +user_json=$(gh api users/${{ github.event.review.user.login }}) +user_name=$(echo "$user_json" | jq -r '.name') +user_email=$(echo "$user_json" | jq -r '.email') +if [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*${user_name}.*$ ]]; then +echo "Reviewer already added: ${user_name} <${user_email}>" +elif [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*$ ]]; then + pr_body="${{ github.event.pull_request.body }}, ${user_name} <${user_email}>" + gh pr edit ${{ github.event.pull_request.number }} --body "${pr_body}" + echo "Added reviewer: ${user_name} <${user_email}>" Review Comment: Tacking on additional reviewers like this seems brittle. Could we instead use the API to read the list of approvers and replace whole the "Reviewers:" line? https://docs.github.com/en/rest/pulls/reviews?apiVersion=2022-11-28 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]
divijvaidya merged PR #14955: URL: https://github.com/apache/kafka/pull/14955 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]
divijvaidya commented on PR #14955: URL: https://github.com/apache/kafka/pull/14955#issuecomment-1877769128 JDK 8 tests are all successful and other failing tests don't look related -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya merged PR #15125: URL: https://github.com/apache/kafka/pull/15125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya commented on PR #15125: URL: https://github.com/apache/kafka/pull/15125#issuecomment-1877746515 The modified test is successful https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15125/1/testReport/org.apache.kafka.streams.processor.internals/StoreChangelogReaderTest/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small cleanups in Connect [kafka]
divijvaidya commented on code in PR #15128: URL: https://github.com/apache/kafka/pull/15128#discussion_r1442237669 ## connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java: ## @@ -119,16 +117,16 @@ private boolean isInternalRequest(ContainerRequestContext requestContext) { public static class BasicAuthCallBackHandler implements CallbackHandler { -private String username; -private String password; +private final String username; Review Comment: I have fixed all such "field not being final warnings" in https://github.com/apache/kafka/pull/15072 Please check when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: New year code clean up - misc [kafka]
divijvaidya commented on code in PR #15071: URL: https://github.com/apache/kafka/pull/15071#discussion_r1442225284 ## trogdor/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java: ## @@ -67,8 +67,7 @@ public void set(String name, PartitionsSpec value) { } public TopicsSpec immutableCopy() { -HashMap mapCopy = new HashMap<>(); -mapCopy.putAll(map); +HashMap mapCopy = new HashMap<>(map); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: New year code clean up - misc [kafka]
divijvaidya commented on code in PR #15071: URL: https://github.com/apache/kafka/pull/15071#discussion_r1442224965 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1079,8 +1077,7 @@ private void clearAll() { } private List drainPendingCommits() { -ArrayList res = new ArrayList<>(); - res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); +ArrayList res = new ArrayList<>(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Minor: Add KIP-923 to upgrade-guide.html and dsl-api.html [kafka]
wcarlson5 commented on PR #14725: URL: https://github.com/apache/kafka/pull/14725#issuecomment-1877692107 @mjsax Finally got around to your comments. I think we need to pick this back to 3.6 and 3.7 as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Minor: Add KIP-923 to upgrade-guide.html and dsl-api.html [kafka]
wcarlson5 commented on code in PR #14725: URL: https://github.com/apache/kafka/pull/14725#discussion_r1442198505 ## docs/streams/developer-guide/dsl-api.html: ## @@ -2849,6 +2851,12 @@ KTable-KTable Foreign-Key When the table is versioned, the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp less than or equal to the stream record timestamp. If the stream record timestamp is older than the table's history retention, then the record is dropped. +To use the Grace Period, the table needs to be versioned. Review Comment: I could go either way. Lets un cap it for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]
clolov opened a new pull request, #15131: URL: https://github.com/apache/kafka/pull/15131 Adding the public documentation for metrics introduced in [KIP-963](https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 commented on code in PR #15080: URL: https://github.com/apache/kafka/pull/15080#discussion_r1442146732 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception { assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); } +@Test() +public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { +connector = mock(BogusSourceConnector.class); +expectAdd(SourceSink.SOURCE); + +// Start the connector +Map config = connectorConfig(SourceSink.SOURCE); +Connector connectorMock = mock(SourceConnector.class); +expectConfigValidation(connectorMock, true, config); + +herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + +// Wait on connector to start +Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); +assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + +// Updated task with new config +Map updatedTaskConfig = taskConfig(SourceSink.SOURCE); +updatedTaskConfig.put("dummy-task-property", "yes"); +when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) +.thenReturn(Collections.singletonList(updatedTaskConfig)); + +// Expect that tasks will be stopped and started again + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); +doNothing().when(worker).stopAndAwaitTasks(singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0))); +when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), eq(herder), eq(TargetState.STARTED))).thenReturn(true); + +// Set new connector config +Map newConfig = connectorConfig(SourceSink.SOURCE); +newConfig.put("dummy-task-property", "yes"); +herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, createCallback); Review Comment: This re-uses the createCallback, which should already be resolved from the earlier request. I think we need a new future to wait for, like testPutConnectorConfig. When I made that change, the calback wasn't completing because this test was missing a mock for `Worker#startConnector`. There's a mocking idiom used elsewhere in the test that you can copy: https://github.com/apache/kafka/blob/e6f2624c48ceab032811693e1013b70c6ee16c74/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java#L250-L255 After that is fixed, I think some of the other assertions change slightly, since the second put isn't actually executing to completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking
Greg Harris created KAFKA-16084: --- Summary: Simplify and deduplicate StandaloneHerderTest mocking Key: KAFKA-16084 URL: https://issues.apache.org/jira/browse/KAFKA-16084 Project: Kafka Issue Type: Test Components: connect Reporter: Greg Harris The StandaloneHerderTest has some cruft that can be cleaned up. What i've found: * The `connector` field is written in nearly every test, but only read by one test, and looks to be nearly irrelevant. * `expectConfigValidation` has two ways of specifying consecutive validations. 1. The boolean shouldCreateConnector which is true in the first invocation and false in subsequent invocations. 2. by passing multiple configurations via varargs. * The class uses a mix of Mock annotations and mock(Class) invocations * The test doesn't stop the thread pool created inside the herder and might leak threads * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times throughout the test * Some waits are 1000 ms and others are 1000 s, and could be pulled out to constants or a util method -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
ditac opened a new pull request, #15130: URL: https://github.com/apache/kafka/pull/15130 When expiring inflight requests, the network client does not take throttle time into account. If a connection has multiple inflight requests (default of 5) and each request is throttled then some of the requests can incorrectly marked as expired. Subsequently the connection is closed and the client establishes a new connection to the broker. This behavior leads to unnecessary connections to the broker, leads to connection storms and increases latencies. ### Testing Unit tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Quotav2handler [DRAFT] [kafka]
nickgarvey closed pull request #15129: Quotav2handler [DRAFT] URL: https://github.com/apache/kafka/pull/15129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Quotav2handler [DRAFT] [kafka]
nickgarvey opened a new pull request, #15129: URL: https://github.com/apache/kafka/pull/15129 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-16082: Fix Version/s: 3.7.0 > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-16082: Fix Version/s: (was: 3.7.0) > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877542832 > @iit2009060 You may have mistyped as controller but it does not have any role in the fetch path here. Remote fetch is executed through a specific purgatory with DelayedRemoteFetch and read operation/callback is executed in a specific thread pool. It returns empty records when there is no data for a specific offset in the remote storage. Yes @satishd correct . I am still going through the code but need few inputs 1. If we send records empty , How it determine the next offset to be fetched. Because in LogOffSetMetadata we always return the fetchOffset request. Or it keep requesting the fetchOffSet request ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877528171 > Thanks @iit2009060 for the PR. > > Let us say there are two segments in remote storage and subsequents segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : offsets 25 to 30 are compacted. local-seg-31[31, 40] > > When a fetch request comes for offsets with in [25, 30] then it should move to the local segment as those offsets might have been compacted earlier. Did you also cover this scenario in this PR? @satishd I am trying to reproduce the case when last offsets of the segment earlier than the active segment is compacted away. I tried locally but not able to reproduce the above scenario ? There always exist a last offset in the segment earlier than the active segment. As per the article https://towardsdatascience.com/log-compacted-topics-in-apache-kafka-b1aa1e4665a7 All records in the active segment are never compacted. @satishd Do you have a step in mind to regenerate the above scenario ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]
cadonna commented on PR #15117: URL: https://github.com/apache/kafka/pull/15117#issuecomment-1877517561 @lucasbru What do you think about my previous question? > Should we also add a integration test that tests the scenario? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams
[ https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803252#comment-17803252 ] Ayoub Omari commented on KAFKA-14404: - Hi [~ableegoldman] & [~sujayopensource] I see that this ticket is still open since some time now. Can I take it up ? > Fix & update docs on client configs controlled by Streams > - > > Key: KAFKA-14404 > URL: https://issues.apache.org/jira/browse/KAFKA-14404 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sujay Hegde >Priority: Major > Labels: docs, newbie > > There are a handful of client configs that can't be set by Streams users for > various reasons, such as the group id, but we seem to have missed a few of > them in the documentation > [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]: > the partitioner assignor (Consumer) and partitioner (Producer). > This section of the docs also just needs to be cleaned up in general as there > is overlap between the [Default > Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values] > and [Parameters controlled by Kafka > Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26] > sections, and the table of contents is messed up presumably due to an issue > with the section headers. > We should separate these with one section covering (only) configs where > Streams sets a different default but this can still be overridden by the > user, and the other section covering the configs that Streams hardcodes and > users can never override. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection
Adithya Chandra created KAFKA-16083: --- Summary: Exclude throttle time when expiring inflight requests on a connection Key: KAFKA-16083 URL: https://issues.apache.org/jira/browse/KAFKA-16083 Project: Kafka Issue Type: Bug Components: clients Reporter: Adithya Chandra When expiring inflight requests, the network client does not take throttle time into account. If a connection has multiple inflight requests (default of 5) and each request is throttled then some of the requests can incorrectly marked as expired. Subsequently the connection is closed and the client establishes a new connection to the broker. This behavior leads to unnecessary connections to the broker, leads to connection storms and increases latencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1877478293 hi @lucasbru - thanks. i personally prefer to make consumer interceptor thread safe because firstly, (i think) it simplifies the code quite a bit and secondly, it is possible to miss the interceptor invocation if user fail to poll/close after a commit is sent. latter is a bigger problem i think. do you have an idea of how to fix this? `When would we not trigger the interceptor in this case?` i think we always want to trigger onCommit interceptor if provided. so I think after sending the last commit sync on close, we just need to clear the queue, close the network thread, and invoke the callbacks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: New year code clean up - misc [kafka]
mimaison commented on code in PR #15071: URL: https://github.com/apache/kafka/pull/15071#discussion_r1442027962 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1079,8 +1077,7 @@ private void clearAll() { } private List drainPendingCommits() { -ArrayList res = new ArrayList<>(); - res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); +ArrayList res = new ArrayList<>(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); Review Comment: We can use `List` on the left ## trogdor/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java: ## @@ -67,8 +67,7 @@ public void set(String name, PartitionsSpec value) { } public TopicsSpec immutableCopy() { -HashMap mapCopy = new HashMap<>(); -mapCopy.putAll(map); +HashMap mapCopy = new HashMap<>(map); Review Comment: Can we use `Map` on the left side? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16081) Limit number of ssl connections in brokers
[ https://issues.apache.org/jira/browse/KAFKA-16081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-16081: --- Labels: need-kip (was: ) > Limit number of ssl connections in brokers > -- > > Key: KAFKA-16081 > URL: https://issues.apache.org/jira/browse/KAFKA-16081 > Project: Kafka > Issue Type: New Feature >Reporter: Jimmy Wang >Assignee: Jimmy Wang >Priority: Major > Labels: need-kip > > In Kafka, an SSL connection occupies approximately 100KB of memory, while a > plaintext connection occupies around 250 bytes, resulting in a memory > footprint ratio of approximately 400:1. Therefore, there should be a > limitation for SSL connections. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16081 Limit number of ssl connections in brokers [kafka]
mimaison commented on PR #15126: URL: https://github.com/apache/kafka/pull/15126#issuecomment-1877414085 Thanks for the PR. Since this is adding new configurations, this change requires a KIP to be voted by the community. See the [Kafka Improvement Proposals](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) page on the wiki for details about the KIP process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Small cleanups in Connect [kafka]
mimaison opened a new pull request, #15128: URL: https://github.com/apache/kafka/pull/15128 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1441959148 ## clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java: ## @@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() { public static class TestFileConfigProvider extends FileConfigProvider { @Override -protected Reader reader(String path) throws IOException { +protected Reader reader(Path path) throws IOException { return new StringReader("testKey=testResult\ntestKey2=testResult2"); } } + +@Test +public void testAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testAllowedFilePath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testMultipleAllowedPaths() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir); +configProvider.configure(configs); + +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); + +ConfigData configData = configProvider.get(dirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); + +configData = configProvider.get(siblingDirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(siblingDirFile); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedFilePath() throws IOException { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +//another file under the same directory +Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2")); +ConfigData configData = configProvider.get(dirFile2.toString()); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNoTraversal() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +// Check we can't escape outside the path directory +ConfigData configData = configProvider.get(dirFile + Paths.get("/../siblingdir/siblingdirFile")); Review Comment: > I did however deduplicate the implementation classes by adding ConfigProviderUtils class with the common methods. Rather than making these static utilities and have the caller store the List, you can keep the List as an instance variable in the shared class. This ensures that the List is handled completely by the validation logic and not changed by the config provider implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]
mimaison commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1877357358 Sorry for the delay. I tried to build this but it fails in the CI with the following error: ``` WorkflowScript: 91: unexpected token: % @ line 91, column 82. InMillis() / 1000 / 60 / 60 / % 24 ``` See https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14717/6/pipeline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fixing shadow jar publish [kafka]
apoorvmittal10 opened a new pull request, #15127: URL: https://github.com/apache/kafka/pull/15127 ## What The PR fixes the publishing of `kafka-clients` artifact to remote maven. The `kafka-clients` jar is recently been shadowed which does publish the artifacts to local maven successfully but errors when publishing to remote maven. The issue triggers only with `publishMavenJavaPublicationToMavenRepository` and `signing` but `publishToMavenLocal` works correctly with earlier changes. Generating signed `asc` files error out for shadowed release artifacts as the module name (`clients`) differs from the artifact name (`kafka-clients`). The fix is basically to explicitly define `artifact` of `shadowJar` to `signing` and `publish` plugin. `project.shadow.component(mavenJava)` previously outputs the name as `client--all.jar` though the `classifier` and `archivesBaseName` are already defined correctly in `:clients` and `shadowJar` construction. Below is another way where the artifact details can be overridden post construction in `afterEvaluate` to let `publish` plugin know about the correct `artifactId` but it doesn't solve the `signing` plugin issue where `signing` plugin is unaware of `kafka-client-.jar` and skips signing. For `signing` plugin the `artifact` should be explicitly defined as well. ``` project.shadow.component(mavenJava) afterEvaluate { pom.withXml { asNode().dependencies.dependency.findAll() { def projectName = it.artifactId.text() if (['clients'].contains(projectName)) { it.artifactId*.value = 'kafka-clients' } } } } ``` ## Tests: - I created person `gpg` secrets locally for signing and private maven repo to upload the artifacts. I ran the release pipeline pointing to custom repo and can successfully push the `kafka-clients` artifacts. ![Screenshot 2024-01-04 at 3 46 06 PM](https://github.com/apache/kafka/assets/2861565/e8f71eb9-905f-4838-afa1-99929d613e2d) - Testing shaded dependencies generated successfully. I can see no difference in the previous and release shaded jars, and can also see shaded dependencies in the jar. ![Screenshot 2024-01-04 at 3 50 11 PM](https://github.com/apache/kafka/assets/2861565/e334e51c-ec00-419a-97a1-62c0315bfefe) ## Query: - I compared the 3.6.1 kafka release artifacts [here](https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.6.1/) with generated artifacts in my personal repo, and can find `module` metadata artifact is missing. Debugged the issue and it seems `gradle publish` - `from components.java` actually registers both `jar` and `module` for publish which is not there in case of `shadowJar` hence the question is that do we require to publish `module` metadata information or are we good? cc: @stanislavkozlovski @xvrl @AndrewJSchofield ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
satishd commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877333095 @iit2009060 You may have mistyped as controller but it does not have any role in the fetch path here. Remote fetch is executed through a specific purgatory with DelayedRemoteFetch and read operation/callback is executed in a specific thread pool. It returns empty records when there is no data for a specific offset in the remote storage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
mimaison commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1441931015 ## clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.internals; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ConfigProviderUtils { + +/** + * Reads the given {@code configValue} and creates a list of paths. + * @param configValue allowed.paths config value which is a string containing comma separated list of paths + * @return List of paths or null if the {@code configValue} is null or empty string. + */ +public static List configureAllowedPaths(String configValue) { +if (configValue != null && !configValue.isEmpty()) { +List allowedPaths = new ArrayList<>(); +Arrays.stream(configValue.split(",")).forEach(b -> allowedPaths.add(Paths.get(b).normalize())); Review Comment: You still need to normalize them to remove `.` and `..` in the middle of the path (for example `/tmp/../path`) but I think it would be preferable to ensure allowed paths are absolute. Should we also ensure they exist or are there use cases where the paths might be created later? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16003: Always create the /config/topics ZNode even for topics w… [kafka]
mimaison commented on PR #15022: URL: https://github.com/apache/kafka/pull/15022#issuecomment-1877305934 @mumrah can you take a look? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1877282686 @gharris1727 , I've added the test. It first starts the connector, then updates both the connector and task at the same time. The reason for this is that task reconfiguration will skip if the connector is not yet started. Hope it looks right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803205#comment-17803205 ] Gaurav Narula commented on KAFKA-16082: --- We tried to analyse the failure scenarios further, keeping the existing design for inter-broker replica movement and here's a summary of our findings: Consider a partition `tp0` being moved from `dir1` to `dir2`. To recap, the current design (i) Waits for the future replica (in `dir2`) to catch up (ii) Sends an RPC to the controller to mark `dir2` as the log dir for the partition. (iii) On getting a successful response back from the controller, we wait for the future replica to catch up once again. Note that we hold locks when the compare LEOs for current and future replicas. (iv) When caught up, we promote the future replica by renaming the directory in `dir2` atomically and get rid of the `-future` suffix. We also update broker local caches to denote that the partition resides in `dir2` and no future replica exists. Finally, we atomically rename the directory in `dir1` and add a `-delete` suffix for it to be cleaned later. Let's consider the following failure categories: 1. Log directory failure during (iv) This can further be broken down into two scenarios: (a) `dir2` fails This would result in an atomic rename of directory in `dir2` to fail and a KafkaStorageException to be propagated up in ReplicaAlterLogDirsThread and the thread to abort. Eventually, `ReplicaManager::maybeUpdateTopicAssignment` will be run while handling the metadata update from the controller. The function will correct the assignment to `dir1`. (b) `dir1` fails This would result in an atomic rename of directory in `dir1` to fail and a KafkaStorageException to be propagates up in ReplicaAlterLogDirsThread. Since the renaming of future replica and the caches are up to date, `ReplicaManager::maybeUpdateTopicAssignment` will be a no-op. However, when the broker is restarted, it will fail during startup as two log dirs will exist for the partition in `dir1` and `dir2`. The error message is clear here in suggesting the partition must be removed from the directory which failed recently (`dir1`) 2. Log directory failure during (iii) This would result in `replicaAlterLogDirsManager.removeFetcherForPartitions` being invoked. Eventually, `ReplicaManager::maybeUpdateTopicAssignment` will be run while handling the metadata update from the controller. The function will correct the assignment to `dir1`. 3. Broker crashes during (iii) and starts with an empty `dir1` Broker will catch up with the metadata from the controller, realises `dir2` should own `tp0`. It ignores the failed future replica in `dir2` and creates a new future replica in `dir2`, streaming the logs from the new leader. This is safe as long as the new leader was in-sync prior to getting elected. What we overlooked earlier was the fact that `ReplicaManager::maybeUpdateTopicAssignment` tries to reconcile the broker's state with the controller, causing it to converge eventually. So far, we're unable to come up with a scenario with data loss. The bug we have so far is the failure to remove an abandoned future directory in scenario (3) which seems to be more benign. I'm curious to hear what others think about these scenarios and possibly others that they come across? Perhaps someone who's worked closely in Partion.scala can pitch in? > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]
lucasbru commented on code in PR #15117: URL: https://github.com/apache/kafka/pull/15117#discussion_r1441867884 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1164,9 +1243,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); +final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId04Partitions).build(); Review Comment: Right, I only fixed the other one. I think I recently changed my IDE to use 8 character continuation indentation, became somebody complained about it in a client PR. Changing it back. Honestly, I wish we'd just have automatic formatting in kafka. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]
cadonna commented on code in PR #15117: URL: https://github.com/apache/kafka/pull/15117#discussion_r1441848969 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1164,9 +1243,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); +final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId04Partitions).build(); Review Comment: I think there was a similar indentation issue on line 1713 that you fixed. I also saw one on line 1652. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1164,9 +1243,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); +final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId04Partitions).build(); Review Comment: This does not seem done. But nevermind, if it is just this I will of course not block the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
viktorsomogyi commented on PR #15115: URL: https://github.com/apache/kafka/pull/15115#issuecomment-1877217071 @mumrah @divijvaidya, the script just edits the PR body, it won't touch the commit itself. I discovered yesterday that this PR also already runs the action, however it failed with something (quotation issue). While fixing it, I discovered that by default the GH token doesn't permit getting user details (in my experiments on my own forks it wasn't an issue because likely it's my own). I worked this out in #15123. The run for this is https://github.com/apache/kafka/actions/runs/7410908203/job/20164210753?pr=15123. It says the following: ``` Run user_json=$(gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-[1](https://github.com/apache/kafka/actions/runs/7410908203/job/20164210753?pr=15123#step:3:1)1-28" users/viktorsomogyi) GraphQL: Resource not accessible by integration (updatePullRequest) Error: Process completed with exit code 1. ``` The first line shouldn't deceive you, it's just the first line of the script. I think the error is that pull requests from forks doesn't have a write token for pull requests. I tried setting that and everything else to write in #15123 but according to the logs it doesn't apply. Also I found this in the documentation ([here](https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token)): > You can use the `permissions` key to add and remove read permissions for forked repositories, but typically you can't grant write access. The exception to this behavior is where an admin user has selected the Send write tokens to workflows from pull requests option in the GitHub Actions settings. For more information, see "[Managing GitHub Actions settings for a repository](https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/enabling-features-for-your-repository/managing-github-actions-settings-for-a-repository#enabling-workflows-for-private-repository-forks)." Do you think we can change this? We'll probably need an admin for the repo, I'm just a simple member and can't do anything basically. I'll try to work this out in my own repo (will ask someone to create PRs against it 😄) and see if I can find the exact setting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
Phuc-Hong-Tran commented on PR #15020: URL: https://github.com/apache/kafka/pull/15020#issuecomment-1877214088 @philipnee, PTAL, thanks in advance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano updated KAFKA-16082: -- Priority: Blocker (was: Critical) > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]
ijuma commented on PR #14868: URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877205588 Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
Proven Provenzano created KAFKA-16082: - Summary: JBOD: Possible dataloss when moving leader partition Key: KAFKA-16082 URL: https://issues.apache.org/jira/browse/KAFKA-16082 Project: Kafka Issue Type: Bug Components: jbod Affects Versions: 3.7.0 Reporter: Proven Provenzano Assignee: Gaurav Narula Fix For: 3.7.0 There is a possible dataloss scenario when using JBOD, when moving the partition leader log from one directory to another on the same broker, when after the destination log has caught up to the source log and after the broker has sent an update to the partition assignment if the broker accepts and commits a new record for the partition and then the broker restarts and the original partition leader log is lost then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]
mimaison commented on PR #14868: URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877201295 Yes I expect we'll be able to remove that when we remove ZooKeeper. In the meantime, these PRs are helpful to move the commands to the tools module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1877184050 @philipnee Good point about auto-commits, I missed that. It's a pity that auto-commits aren't triggered from the main thread (I wonder if we could do that? Would be another architectural change). I would go for solution num 1 because - Original KIP clearly states that it should only call into the interceptor from a single thread, so I think we'd easily break code if we start requiring them to be thread-safe. Yeah, you could write a KIP, but not sure if that's reasonable here. - When would we not trigger the interceptor in this case? If you are talking about `Consumer.close()`, could we make sure to empty the background event queue after closing the background thread, and then run the invoker? Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
viktorsomogyi commented on code in PR #15115: URL: https://github.com/apache/kafka/pull/15115#discussion_r1441813320 ## .github/workflows/pr_reviews.yml: ## @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Adding Reviewers + +on: + pull_request_review: +types: [submitted] + +jobs: + add_reviewers: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Add Reviewers + run: | +user_json=$(gh api users/${{ github.event.review.user.login }}) +user_name=$(echo "$user_json" | jq -r '.name') +user_email=$(echo "$user_json" | jq -r '.email') +if [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*${user_name}.*$ ]]; then +echo "Reviewer already added: ${user_name} <${user_email}>" +elif [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*$ ]]; then + pr_body="${{ github.event.pull_request.body }}, ${user_name} <${user_email}>" + gh pr edit ${{ github.event.pull_request.number }} --body "${pr_body}" + echo "Added reviewer: ${user_name} <${user_email}>" +else + pr_body="${{ github.event.pull_request.body }} + +Reviewers: ${user_name} <${user_email}>" Review Comment: It's intentional. The "Reviews: " part would have an indentation on the PR as it literally sets the `pr_body` var as the new PR body. In other words the bash string made out of it will contain the indentation (hence the empty line as well). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
viktorsomogyi commented on code in PR #15115: URL: https://github.com/apache/kafka/pull/15115#discussion_r1441800052 ## .github/workflows/pr_reviews.yml: ## @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Adding Reviewers + +on: + pull_request_review: +types: [submitted] + +jobs: + add_reviewers: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Add Reviewers + run: | +user_json=$(gh api users/${{ github.event.review.user.login }}) Review Comment: Thanks, I'll add these, good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]
ijuma commented on PR #14868: URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877150309 You're right. The weird part is actually the fact that `ConfigCommand` relies on this detail instead of using the relevant public apis. I guess it's partly due to the remaining zk code. Seems ok to go with this approach for now and clean it up once we branch for 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16081 Limit number of ssl connections in brokers [kafka]
JimmyWang6 opened a new pull request, #15126: URL: https://github.com/apache/kafka/pull/15126 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16081) Limit number of ssl connections in brokers
Jimmy Wang created KAFKA-16081: -- Summary: Limit number of ssl connections in brokers Key: KAFKA-16081 URL: https://issues.apache.org/jira/browse/KAFKA-16081 Project: Kafka Issue Type: New Feature Reporter: Jimmy Wang Assignee: Jimmy Wang In Kafka, an SSL connection occupies approximately 100KB of memory, while a plaintext connection occupies around 250 bytes, resulting in a memory footprint ratio of approximately 400:1. Therefore, there should be a limitation for SSL connections. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]
nizhikov commented on PR #14868: URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877129273 @ijuma > Is this used by anything outside of tools? AFAICS yes. `DymamicConfig`, `ClientQuotaMetadataManager`, `DynamicConfigPublisher` to name a few. These classes works inside the broker isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]
ijuma commented on PR #14868: URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877112384 It's a bit weird to move stuff like this to `server-common`. Is this used by anything outside of `tools`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address (#14813) [kafka]
cadonna merged PR #14980: URL: https://github.com/apache/kafka/pull/14980 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address (#14813) [kafka]
cadonna commented on PR #14980: URL: https://github.com/apache/kafka/pull/14980#issuecomment-1877111288 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]
nizhikov commented on PR #14868: URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877093884 Test failure was fixed by #417338ad77d15cad2534d7cff436c8776e717255 I merged latest trunk to PR so this must be OK now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]
clolov commented on code in PR #15125: URL: https://github.com/apache/kafka/pull/15125#discussion_r1441743303 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -1335,9 +1299,6 @@ public void shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() { when(standbyStateManager.taskType()).thenReturn(STANDBY); final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener); when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne); - EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes(); -EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); -EasyMock.replay(store, storeMetadataOne); Review Comment: According to Mockito these were unused ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -223,9 +202,6 @@ public void shouldNotRegisterStoreWithoutMetadata() { @Test public void shouldSupportUnregisterChangelogBeforeInitialization() { setupStateManagerMock(); -final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); -EasyMock.replay(mockTasks, store); Review Comment: The mockTasks were reported as unused by Mockito ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -307,15 +284,16 @@ public void shouldSupportUnregisterChangelogBeforeCompletion() { public void shouldSupportUnregisterChangelogAfterCompletion() { setupStateManagerMock(); setupStoreMetadata(); +setupStore(); +@SuppressWarnings("unchecked") Review Comment: This is required because otherwise mocking a Map throws a tantrum. However, if there are nicer ways to achieve the same I am more than happy to change it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]
clolov opened a new pull request, #15125: URL: https://github.com/apache/kafka/pull/15125 This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
kagarwal06 commented on PR #15048: URL: https://github.com/apache/kafka/pull/15048#issuecomment-1877080323 > I saw that but the action only allows me to pick a branch or tag. Ideally I'd like to run it on the pull request. Hi @mimaison , thanks for the feedback. The dockerisation of Apache Kafka follows the following process - Accepts a signed Kafka tarball link. - Expects the presence of both the `.asc` file and `key` for tarball verification. This verification process is implemented in the [Dockerfile](https://github.com/apache/kafka/blob/trunk/docker/jvm/Dockerfile#L78-L83). The `docker_build_test.py`, utilized in GitHub Actions as well, takes an Apache Kafka tarball URL, supplies it to the Dockerfile, builds the Docker image, and runs tests on it. This approach is valuable during Release Candidate (RC) testing, allowing us to verify the Docker image by providing the Apache Kafka RC link along with the necessary `.asc` file and the `key`. However, in the context of pull requests, especially when changes impact the Apache Kafka codebase and the docker path (as in the current PR), testing requires a signed tarball with the associated `keys` and `.asc` file. Presently, our pipeline is not designed for this use-case. **NOTE:** We have locally tested the above changes by building the Apache Kafka and uploading the tarball to S3 and commenting out the tarball verification code in the Dockerfile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]
lucasbru commented on code in PR #15117: URL: https://github.com/apache/kafka/pull/15117#discussion_r1441719248 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final long now, closeTaskClean(task, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToAddBack(task.id())) { stateUpdater.add(task); +} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { +if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { +task.revive(); +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); Review Comment: Done ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1164,9 +1243,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); +final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId04Partitions).build(); Review Comment: Done ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1127,6 +1175,36 @@ public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { +final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) Review Comment: Done ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() { stateUpdater.add(task); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); +} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { +if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { +task.revive(); +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +stateUpdater.add(task); Review Comment: Good point! I was supposed to call `addTaskToStateUpdater` here instead. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1127,6 +1175,36 @@ public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { +final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) +.withInputPartitions(taskId00Partitions) +.inState(State.RESTORING).build(); +final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) +.withInputPartitions(taskId01Partitions) +.inState(State.RUNNING).build(); +when(stateUpdater.hasRemovedTasks()).thenReturn(true); +when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); +final TasksRegistry tasks = mock(TasksRegistry.class); +when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
clolov commented on PR #15116: URL: https://github.com/apache/kafka/pull/15116#issuecomment-1877054383 Okay @lucasbru, I will look into it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
lucasbru commented on PR #15116: URL: https://github.com/apache/kafka/pull/15116#issuecomment-1877052454 @clolov I do believe that the flaky test @divijvaidya mentioned started with another mockito migration PR on 4th of december, #13932. Maybe you could give it a look even if it's not related to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya merged PR #15116: URL: https://github.com/apache/kafka/pull/15116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya commented on PR #15116: URL: https://github.com/apache/kafka/pull/15116#issuecomment-1877045252 My bad! the test failure I notes above is known to be flaky as per https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=org.apache.kafka.streams.processor.internals.StreamThreadTest&tests.test=shouldNotEnforceRebalanceWhenCurrentlyRebalancing%5B0%5D and is not related to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
mimaison commented on PR #15048: URL: https://github.com/apache/kafka/pull/15048#issuecomment-1877012641 I saw that but the action only allows me to pick a branch or tag. Ideally I'd like to run it on the pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
divijvaidya commented on PR #15116: URL: https://github.com/apache/kafka/pull/15116#issuecomment-1876978251 @clolov we have test failures related to this PR. Can you please fix them: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15116/1/testReport/org.apache.kafka.streams.processor.internals/StreamThreadTest/Build___JDK_11_and_Scala_2_13___shouldNotEnforceRebalanceWhenCurrentlyRebalancing_0_/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16080) partiton not retention after execute ALTER_REPLICA_LOG_DIRS and LEADER_AND_ISR request at the same time
wangliucheng created KAFKA-16080: Summary: partiton not retention after execute ALTER_REPLICA_LOG_DIRS and LEADER_AND_ISR request at the same time Key: KAFKA-16080 URL: https://issues.apache.org/jira/browse/KAFKA-16080 Project: Kafka Issue Type: Bug Affects Versions: 3.3.2 Reporter: wangliucheng Hi, I found a reproducible problem, when server running a task which is ALTER_REPLICA_LOG_DIRS e.g. test-1 /data01/kafka/log/test01-1 -> /data02/kafka/log/test01-1.xxx-future then 1) thread task kafka-log-retention not work both /data01/kafka/log/test01-1 and /data02/kafka/log/test01-1.xxx-future, The result is that the data will not retention while task is running analysis: The kafka-log-retention thread not work on test01-1 after invoking logManager.abortAndPauseCleaning(topicPartition) 2) If LEADER_AND_ISR is request while ALTER_REPLICA_LOG_DIRS task is running, After the task is end, the data which in /data02/kafka/log/test01-1 will not be deleted all the time analysis: logManager.abortAndPauseCleaning(topicPartition) invoked twice, but resumed only once How to optimize this problem Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1441652042 ## clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.internals; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ConfigProviderUtils { + +/** + * Reads the given {@code configValue} and creates a list of paths. + * @param configValue allowed.paths config value which is a string containing comma separated list of paths + * @return List of paths or null if the {@code configValue} is null or empty string. + */ +public static List configureAllowedPaths(String configValue) { +if (configValue != null && !configValue.isEmpty()) { +List allowedPaths = new ArrayList<>(); +Arrays.stream(configValue.split(",")).forEach(b -> allowedPaths.add(Paths.get(b).normalize())); Review Comment: So instead of normalising the path, we don't allow any traversal? Should we return an exception in that case? Or if some paths contain `..` but some don't, do we only configure the ones that are absolute or only configure if all paths are absolute? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]
cadonna commented on code in PR #15117: URL: https://github.com/apache/kafka/pull/15117#discussion_r1441615070 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1164,9 +1243,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); +final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId04Partitions).build(); Review Comment: nit: ```suggestion .inState(State.RESTORING) .withInputPartitions(taskId04Partitions).build(); ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1127,6 +1175,36 @@ public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { +final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) Review Comment: nit: Could you please use more meaningful names for those task variables? Something like `activeTask` and `standbyTask` would already be better. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() { stateUpdater.add(task); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); +} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { +if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { +task.revive(); +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +stateUpdater.add(task); Review Comment: Where is the task initialized? The task is closed cleanly and the call to `revive()` sets the task to `CREATED`. However, the task is never initialized after that. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final long now, closeTaskClean(task, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToAddBack(task.id())) { stateUpdater.add(task); +} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { +if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { +task.revive(); +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); Review Comment: My comment above also applies here. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1127,6 +1175,36 @@ public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { +final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) +.withInputPartitions(taskId00Partitions) +.inState(State.RESTORING).build(); +final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) +.withInputPartitions(taskId01Partitions) +.inState(State.RUNNING).build(); +when(stateUpdater.hasRemovedTasks()).thenReturn(true); +when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); +final TasksRegistry tasks = mock(TasksRegistry.class); +when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions); Review Comment: I thought a standby task can never be added to this pending action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
VedarthConfluent commented on PR #15048: URL: https://github.com/apache/kafka/pull/15048#issuecomment-1876954982 @mimaison There is a GitHub Actions Workflow added for just that! You can check it out [here](https://github.com/apache/kafka/actions/workflows/docker_build_and_test.yml). Also there is helpful documentation for building, testing and releasing the docker image [here](https://github.com/apache/kafka/blob/trunk/docker/README.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16079: fix threads leak threads in LocalLeaderEndPointTest and other tests [kafka]
showuon commented on code in PR #15122: URL: https://github.com/apache/kafka/pull/15122#discussion_r1441641932 ## core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala: ## @@ -50,12 +51,15 @@ import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ import scala.util.Random -class ReplicaManagerConcurrencyTest { +class ReplicaManagerConcurrencyTest extends Logging { private val time = new MockTime() Review Comment: Nice! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1441637951 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness { }.toSet } - @Test - def testIncreasePartitionCountDuringDeleteTopic(): Unit = { -val topic = "test" + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = { val topicPartition = new TopicPartition(topic, 0) -val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) -brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) -// create brokers -val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) -this.servers = allServers -val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) -// create the topic -TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) -// wait until replica log is created on every broker -TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), - "Replicas for topic test not created.") -// shutdown a broker to make sure the following topic deletion will be suspended -val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) -assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") -val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last -follower.shutdown() -// start topic deletion -adminZkClient.deleteTopic(topic) -// make sure deletion of all of the topic's replicas have been tried -ensureControllerExists() -val (controller, controllerId) = getController() -val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) -TestUtils.waitUntilTrue(() => { - val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) - val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) - allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) -}, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") +if (isKRaftTest()) { + val topicPartition = new TopicPartition(topic, 0) + val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled = true) + this.brokers = allBrokers + val partitionHostingBrokers = allBrokers.filter(b => expectedReplicaAssignment(0).contains(b.config.brokerId)) + + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined), +"Replicas for topic test not created.") + + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition) + val follower = partitionHostingBrokers.filter(s => s.config.brokerId != leaderIdOpt).last + follower.shutdown() + // start topic deletion + admin.deleteTopics(Collections.singletonList(topic)).all().get() + + // increase the partition count for topic + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(partitionHostingBrokers)) + TestUtils.resource(Admin.create(props)) { adminClient => +try { + adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() +} catch { + case _: ExecutionException => +} + } -// increase the partition count for topic -val props = new Properties() -props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers)) -TestUtils.resource(Admin.create(props)) { adminClient => - try { -adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() - } catch { -case _: ExecutionException => + // bring back the failed broker + follower.startup() + TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers) +} else { + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers + val partitionHostingServers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) +
[jira] [Updated] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated
[ https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16077: Priority: Blocker (was: Critical) > Streams fails to close task after restoration when input partitions are > updated > --- > > Key: KAFKA-16077 > URL: https://issues.apache.org/jira/browse/KAFKA-16077 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > > There is a race condition in the state updater that can cause the following: > # We have an active task in the state updater > # We get fenced. We recreate the producer, transactions now uninitialized. > We ask the state updater to give back the task, add a pending action to close > the task clean once it’s handed back > # We get a new assignment with updated input partitions. The task is still > owned by the state updater, so we ask the state updater again to hand it back > and add a pending action to update its input partition > # The task is handed back by the state updater. We update its input > partitions but forget to close it clean (pending action was overwritten) > # Now the task is in an initialized state, but the underlying producer does > not have transactions initialized > This can lead to an exception like this: > {code:java} > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: > Exception caught in process. taskId=1_0, > processor=KSTREAM-SOURCE-05, topic=node-name-repartition, > partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: > TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: > Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContext
Re: [PR] KAFKA-12399: Deprecate Log4J Appender [kafka]
mimaison commented on PR #10244: URL: https://github.com/apache/kafka/pull/10244#issuecomment-1876911687 @dongjinleekr Following up on this. If I don't hear back in the next couple of weeks I'll try to complete this task (and keep you as co-author) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated
[ https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16077: -- Assignee: Lucas Brutschy > Streams fails to close task after restoration when input partitions are > updated > --- > > Key: KAFKA-16077 > URL: https://issues.apache.org/jira/browse/KAFKA-16077 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > > There is a race condition in the state updater that can cause the following: > # We have an active task in the state updater > # We get fenced. We recreate the producer, transactions now uninitialized. > We ask the state updater to give back the task, add a pending action to close > the task clean once it’s handed back > # We get a new assignment with updated input partitions. The task is still > owned by the state updater, so we ask the state updater again to hand it back > and add a pending action to update its input partition > # The task is handed back by the state updater. We update its input > partitions but forget to close it clean (pending action was overwritten) > # Now the task is in an initialized state, but the underlying producer does > not have transactions initialized > This can lead to an exception like this: > {code:java} > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: > Exception caught in process. taskId=1_0, > processor=KSTREAM-SOURCE-05, topic=node-name-repartition, > partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: > TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: > Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl