Re: [PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]

2024-01-04 Thread via GitHub


clolov commented on PR #15131:
URL: https://github.com/apache/kafka/pull/15131#issuecomment-1878268696

   Heya @showuon and thanks for the review! I will make the changes in a couple 
of hours


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-04 Thread via GitHub


VedarthConfluent commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1442527460


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)
+try {
+  prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir)
+} catch {
+  case e: Throwable =>
+val errMsg = s"error while preparing configs: ${e.getMessage}"
+System.err.println(errMsg)
+Exit.exit(1, Some(errMsg))
+}
+
+val formatCmd = formatStorageCmd(finalConfigsDir, envVars)
+StorageTool.main(formatCmd)
+  case _ =>
+throw new RuntimeException(s"Unknown operation $operation. " +
+  s"Please provide a valid operation: 'setup'.")
+}
+  }
+
+  import Constants._
+
+  private def formatStorageCmd(configsDir: String, env: Map[String, String]): 
Array[String] = {

Review Comment:
   The way storage tool is written, we will need the logic that's present in 
the main function. We can create a new function that contains entirety of the 
main function logic and call that from both storage tool main and our code. But 
that will be kind of same as the present solution, but with just one extra 
method. 
   Proper solution will require thorough refactoring of the storage tool, which 
I think is out of scope for this change. Please let us know your thoughts on 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-04 Thread via GitHub


VedarthConfluent commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1442527460


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)
+try {
+  prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir)
+} catch {
+  case e: Throwable =>
+val errMsg = s"error while preparing configs: ${e.getMessage}"
+System.err.println(errMsg)
+Exit.exit(1, Some(errMsg))
+}
+
+val formatCmd = formatStorageCmd(finalConfigsDir, envVars)
+StorageTool.main(formatCmd)
+  case _ =>
+throw new RuntimeException(s"Unknown operation $operation. " +
+  s"Please provide a valid operation: 'setup'.")
+}
+  }
+
+  import Constants._
+
+  private def formatStorageCmd(configsDir: String, env: Map[String, String]): 
Array[String] = {

Review Comment:
   The way storage tool is written, we will need the logic that's present in 
the main function. We can create a new function hat contains entirety of the 
main function logic and call that from both storage tool main and our code. But 
that will be kind of same as the present solution, but with just one extra 
method. 
   Proper solution will require thorough refactoring of the storage tool, which 
I think is out of scope for this change. Please let us know your thoughts on 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2024-01-04 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803405#comment-17803405
 ] 

Phuc Hong Tran commented on KAFKA-15341:


[~divijvaidya], broker feature is just the IBP version, so we just need to 
specify the version that support tiered storage, and fence brober accordingly 
(I think the quorum already has the functionality to block out broker which 
doesn't satisfy cluster's IBP). The other part is that we need to update broker 
registration request data to include tiered storage enable status, which may 
require a KIP. What do you think?

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.8.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-04 Thread via GitHub


showuon commented on PR #15132:
URL: https://github.com/apache/kafka/pull/15132#issuecomment-1878091765

   @divijvaidya , this should be the last fix for kafkaApisTest. (Hope so!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-04 Thread via GitHub


showuon commented on code in PR #15132:
URL: https://github.com/apache/kafka/pull/15132#discussion_r1442473158


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2061,20 +2071,24 @@ class KafkaApisTest extends Logging {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, 
Errors.PRODUCER_FENCED)))
-  kafkaApis = createKafkaApis()
-  kafkaApis.handleInitProducerIdRequest(request, requestLocal)
-
-  verify(requestChannel).sendResponse(
-ArgumentMatchers.eq(request),
-capturedResponse.capture(),
-ArgumentMatchers.eq(None)
-  )
-  val response = capturedResponse.getValue
+  val kafkaApis = createKafkaApis()

Review Comment:
   This instance is created inside a for loop. We need to close it 1 by 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-04 Thread via GitHub


showuon commented on code in PR #15132:
URL: https://github.com/apache/kafka/pull/15132#discussion_r1442472935


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -1669,12 +1671,16 @@ class KafkaApisTest extends Logging {
   val request = buildRequest(offsetCommitRequest)
   
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
 any[Long])).thenReturn(0)
-  kafkaApis = createKafkaApis()
-  kafkaApis.handleOffsetCommitRequest(request, 
RequestLocal.withThreadConfinedCaching)
-
-  val response = verifyNoThrottling[OffsetCommitResponse](request)
-  assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-
Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
+  val kafkaApis = createKafkaApis()

Review Comment:
   This kafkaApis instance is created inside a sub-function. We need to close 
it 1 by 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-04 Thread via GitHub


showuon opened a new pull request, #15132:
URL: https://github.com/apache/kafka/pull/15132

   I thought we've closed all kafkaApis instances in KafkaApisTest, but there 
are more...
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-04 Thread via GitHub


hachikuji commented on code in PR #15119:
URL: https://github.com/apache/kafka/pull/15119#discussion_r1442435868


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * A single-threaded driver for {@link KafkaRaftClient}.
+ *
+ * @param  See {@link KafkaRaftClient}
+ */
+public class KafkaRaftClientDriver extends ShutdownableThread {
+private final Logger log;
+private final KafkaRaftClient client;
+private final FaultHandler fatalFaultHandler;
+
+public KafkaRaftClientDriver(
+KafkaRaftClient client,
+String threadNamePrefix,
+FaultHandler fatalFaultHandler,
+LogContext logContext
+) {
+super(threadNamePrefix + "-io-thread", false);
+this.client = client;
+this.fatalFaultHandler = fatalFaultHandler;
+this.log = logContext.logger(KafkaRaftClientDriver.class);
+}
+
+@Override
+public void doWork() {
+try {
+client.poll();
+} catch (Throwable t) {
+throw fatalFaultHandler.handleFault("Unexpected error in raft IO 
thread", t);
+}
+}
+
+@Override
+public boolean initiateShutdown() {
+if (super.initiateShutdown()) {
+client.shutdown(5000).whenComplete((na, exception) -> {
+if (exception != null) {
+log.error("Graceful shutdown of RaftClient failed", 
exception);
+} else {
+log.info("Completed graceful shutdown of RaftClient");
+}
+});
+return true;
+} else {
+return false;
+}
+}
+
+/**
+ * Shutdown the thread. In addition to stopping any utilized threads, this 
will
+ * close the {@link KafkaRaftClient} instance.
+ */
+@Override
+public void shutdown() throws InterruptedException {
+try {
+super.shutdown();
+} finally {
+client.close();

Review Comment:
   I added some comments in the latest commit. Let me know if they address the 
comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-04 Thread via GitHub


hachikuji commented on code in PR #15119:
URL: https://github.com/apache/kafka/pull/15119#discussion_r1442426943


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * A single-threaded driver for {@link KafkaRaftClient}.
+ *
+ * @param  See {@link KafkaRaftClient}
+ */
+public class KafkaRaftClientDriver extends ShutdownableThread {
+private final Logger log;
+private final KafkaRaftClient client;
+private final FaultHandler fatalFaultHandler;
+
+public KafkaRaftClientDriver(
+KafkaRaftClient client,
+String threadNamePrefix,
+FaultHandler fatalFaultHandler,
+LogContext logContext
+) {
+super(threadNamePrefix + "-io-thread", false);
+this.client = client;
+this.fatalFaultHandler = fatalFaultHandler;
+this.log = logContext.logger(KafkaRaftClientDriver.class);
+}
+
+@Override
+public void doWork() {
+try {
+client.poll();
+} catch (Throwable t) {
+throw fatalFaultHandler.handleFault("Unexpected error in raft IO 
thread", t);
+}
+}
+
+@Override
+public boolean initiateShutdown() {
+if (super.initiateShutdown()) {
+client.shutdown(5000).whenComplete((na, exception) -> {
+if (exception != null) {
+log.error("Graceful shutdown of RaftClient failed", 
exception);
+} else {
+log.info("Completed graceful shutdown of RaftClient");
+}
+});
+return true;
+} else {
+return false;
+}
+}
+
+/**
+ * Shutdown the thread. In addition to stopping any utilized threads, this 
will
+ * close the {@link KafkaRaftClient} instance.
+ */
+@Override
+public void shutdown() throws InterruptedException {
+try {
+super.shutdown();
+} finally {
+client.close();

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16079) Fix leak in LocalLeaderEndPointTest/FinalizedFeatureChangeListenerTest/KafkaApisTest/ReplicaManagerConcurrencyTest

2024-01-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16079.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Fix leak in 
> LocalLeaderEndPointTest/FinalizedFeatureChangeListenerTest/KafkaApisTest/ReplicaManagerConcurrencyTest
> --
>
> Key: KAFKA-16079
> URL: https://issues.apache.org/jira/browse/KAFKA-16079
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.7.0
>
>
> Fix leak in 
> LocalLeaderEndPointTest/FinalizedFeatureChangeListenerTest/KafkaApisTest/ReplicaManagerConcurrencyTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16079: fix threads leak threads in LocalLeaderEndPointTest and other tests [kafka]

2024-01-04 Thread via GitHub


showuon merged PR #15122:
URL: https://github.com/apache/kafka/pull/15122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16079: fix threads leak threads in LocalLeaderEndPointTest and other tests [kafka]

2024-01-04 Thread via GitHub


showuon commented on PR #15122:
URL: https://github.com/apache/kafka/pull/15122#issuecomment-1877995119

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-04 Thread via GitHub


jsancio commented on code in PR #15119:
URL: https://github.com/apache/kafka/pull/15119#discussion_r1442414085


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * A single-threaded driver for {@link KafkaRaftClient}.
+ *
+ * @param  See {@link KafkaRaftClient}
+ */
+public class KafkaRaftClientDriver extends ShutdownableThread {
+private final Logger log;
+private final KafkaRaftClient client;
+private final FaultHandler fatalFaultHandler;
+
+public KafkaRaftClientDriver(
+KafkaRaftClient client,
+String threadNamePrefix,
+FaultHandler fatalFaultHandler,
+LogContext logContext
+) {
+super(threadNamePrefix + "-io-thread", false);
+this.client = client;
+this.fatalFaultHandler = fatalFaultHandler;
+this.log = logContext.logger(KafkaRaftClientDriver.class);
+}
+
+@Override
+public void doWork() {
+try {
+client.poll();
+} catch (Throwable t) {
+throw fatalFaultHandler.handleFault("Unexpected error in raft IO 
thread", t);
+}
+}
+
+@Override
+public boolean initiateShutdown() {
+if (super.initiateShutdown()) {
+client.shutdown(5000).whenComplete((na, exception) -> {
+if (exception != null) {
+log.error("Graceful shutdown of RaftClient failed", 
exception);
+} else {
+log.info("Completed graceful shutdown of RaftClient");
+}
+});
+return true;
+} else {
+return false;
+}
+}
+
+/**
+ * Shutdown the thread. In addition to stopping any utilized threads, this 
will
+ * close the {@link KafkaRaftClient} instance.
+ */
+@Override
+public void shutdown() throws InterruptedException {
+try {
+super.shutdown();
+} finally {
+client.close();

Review Comment:
   Okay. How about documenting this decision in the type's Java Doc and the 
private field `KafkaRaftClient`? That way future readers understand that this 
is the exception and not the rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442403275


##
gradle/spotbugs-exclude.xml:
##
@@ -162,8 +162,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
  on the compiler to fail if the code is changed to call a method 
that throws Exception.
  Given that, this bug pattern doesn't make sense for Scala code. 
-->
 
-
-
+

Review Comment:
   Right, it is not needed, made the respective changes in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440213705


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   It was expecting a return for inline declaration, it is throwing a type 
mismatch error without that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402745


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   It was expecting a return for inline declaration, it is throwing a type 
mismatch error without that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402573


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402667


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402484


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * a file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * a directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * a directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile File dir;
+private volatile LogConfig config;
+private volatile long recoveryPoint;
+private volatile LogOffsetMetadata nextOffsetMetadata;
+
+// Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+private volatile String parentDir;
+// The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+// After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+private volatile boolean isMemoryMappedBufferClosed = false;
+
+/**
+ * Creates a new LocalLog instance.
+  

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-04 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1442402411


##
checkstyle/import-control-core.xml:
##
@@ -37,6 +37,8 @@
   
   
   
+  
+  

Review Comment:
   Right, it is not needed. I guess it was needed when that class was kept 
inside the core module.



##
checkstyle/import-control-core.xml:
##
@@ -37,6 +37,8 @@
   
   
   
+  
+  

Review Comment:
   Right, it is not needed. I guess it was needed when that class was kept 
inside the core module. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-04 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-16073:
---
Fix Version/s: 3.8.0

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1, 3.8.0
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-04 Thread via GitHub


hachikuji commented on code in PR #15119:
URL: https://github.com/apache/kafka/pull/15119#discussion_r1442350821


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * A single-threaded driver for {@link KafkaRaftClient}.
+ *
+ * @param  See {@link KafkaRaftClient}
+ */
+public class KafkaRaftClientDriver extends ShutdownableThread {
+private final Logger log;
+private final KafkaRaftClient client;
+private final FaultHandler fatalFaultHandler;
+
+public KafkaRaftClientDriver(
+KafkaRaftClient client,
+String threadNamePrefix,
+FaultHandler fatalFaultHandler,
+LogContext logContext
+) {
+super(threadNamePrefix + "-io-thread", false);
+this.client = client;
+this.fatalFaultHandler = fatalFaultHandler;
+this.log = logContext.logger(KafkaRaftClientDriver.class);
+}
+
+@Override
+public void doWork() {
+try {
+client.poll();
+} catch (Throwable t) {
+throw fatalFaultHandler.handleFault("Unexpected error in raft IO 
thread", t);
+}
+}
+
+@Override
+public boolean initiateShutdown() {
+if (super.initiateShutdown()) {
+client.shutdown(5000).whenComplete((na, exception) -> {
+if (exception != null) {
+log.error("Graceful shutdown of RaftClient failed", 
exception);
+} else {
+log.info("Completed graceful shutdown of RaftClient");
+}
+});
+return true;
+} else {
+return false;
+}
+}
+
+/**
+ * Shutdown the thread. In addition to stopping any utilized threads, this 
will
+ * close the {@link KafkaRaftClient} instance.
+ */
+@Override
+public void shutdown() throws InterruptedException {
+try {
+super.shutdown();
+} finally {
+client.close();

Review Comment:
   Yes, I debated it. It is nice to have the driver own shutdown since it is in 
control of the poll loop. Given that, it seemed safer for it to also close the 
client once it knows that shutdown is complete.
   
   In a future patch, my thought was to have the driver own creation of the 
client as well. We can replace `RaftManager` with a builder which constructs 
the client in the context of the driver. That would establish clear ownership 
and resolve the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-04 Thread via GitHub


ijuma commented on code in PR #15119:
URL: https://github.com/apache/kafka/pull/15119#discussion_r1440877890


##
checkstyle/import-control.xml:
##
@@ -443,6 +443,7 @@
 
 
 
+

Review Comment:
   Should this be a line above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-04 Thread via GitHub


jsancio commented on code in PR #15119:
URL: https://github.com/apache/kafka/pull/15119#discussion_r1442310315


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * A single-threaded driver for {@link KafkaRaftClient}.
+ *
+ * @param  See {@link KafkaRaftClient}
+ */
+public class KafkaRaftClientDriver extends ShutdownableThread {
+private final Logger log;
+private final KafkaRaftClient client;
+private final FaultHandler fatalFaultHandler;
+
+public KafkaRaftClientDriver(
+KafkaRaftClient client,
+String threadNamePrefix,
+FaultHandler fatalFaultHandler,
+LogContext logContext
+) {
+super(threadNamePrefix + "-io-thread", false);
+this.client = client;
+this.fatalFaultHandler = fatalFaultHandler;
+this.log = logContext.logger(KafkaRaftClientDriver.class);
+}
+
+@Override
+public void doWork() {
+try {
+client.poll();
+} catch (Throwable t) {
+throw fatalFaultHandler.handleFault("Unexpected error in raft IO 
thread", t);
+}
+}
+
+@Override
+public boolean initiateShutdown() {
+if (super.initiateShutdown()) {
+client.shutdown(5000).whenComplete((na, exception) -> {
+if (exception != null) {
+log.error("Graceful shutdown of RaftClient failed", 
exception);
+} else {
+log.info("Completed graceful shutdown of RaftClient");
+}
+});
+return true;
+} else {
+return false;
+}
+}
+
+/**
+ * Shutdown the thread. In addition to stopping any utilized threads, this 
will
+ * close the {@link KafkaRaftClient} instance.
+ */
+@Override
+public void shutdown() throws InterruptedException {
+try {
+super.shutdown();
+} finally {
+client.close();
+}
+}
+
+@Override
+public boolean isRunning() {
+return client.isRunning() && !isThreadFailed();
+}
+
+public CompletableFuture handleRequest(
+RequestHeader header,
+ApiMessage request,
+long createdTimeMs
+) {
+RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(
+header.correlationId(),
+request,
+createdTimeMs
+);
+
+client.handle(inboundRequest);
+
+return inboundRequest.completion.thenApply(response -> response.data);

Review Comment:
   How about:
   ```java
   return inboundRequest.completion.thenApply(RaftMessage::data));
   ```



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientDriverTest.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.fault.MockFaultHandler;
+import org.junit.jupiter.api.T

Re: [PR] MINOR: Fixing shadow jar publish [kafka]

2024-01-04 Thread via GitHub


apoorvmittal10 commented on PR #15127:
URL: https://github.com/apache/kafka/pull/15127#issuecomment-1877809786

   > I tested the publish again and confirmed it works!
   
   Awesome!
   
   > Who is a good person to ask about this? Maybe some of the original 
reviewers of the shadow PR?
   
   May be @ijuma or @xvrl can help here. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-04 Thread via GitHub


mumrah commented on code in PR #15115:
URL: https://github.com/apache/kafka/pull/15115#discussion_r1442270635


##
.github/workflows/pr_reviews.yml:
##
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Adding Reviewers
+
+on:
+  pull_request_review:
+types: [submitted]
+
+jobs:
+  add_reviewers:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Add Reviewers
+  run: |
+user_json=$(gh api users/${{ github.event.review.user.login }})
+user_name=$(echo "$user_json" | jq -r '.name')
+user_email=$(echo "$user_json" | jq -r '.email')
+if [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ 
.*${user_name}.*$ ]]; then
+echo "Reviewer already added: ${user_name} <${user_email}>"
+elif [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*$ 
]]; then
+  pr_body="${{ github.event.pull_request.body }}, ${user_name} 
<${user_email}>"
+  gh pr edit ${{ github.event.pull_request.number }} --body 
"${pr_body}"
+  echo "Added reviewer: ${user_name} <${user_email}>"

Review Comment:
   Tacking on additional reviewers like this seems brittle. Could we instead 
use the API to read the list of approvers and replace whole the "Reviewers:" 
line?
   
   https://docs.github.com/en/rest/pulls/reviews?apiVersion=2022-11-28
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]

2024-01-04 Thread via GitHub


divijvaidya merged PR #14955:
URL: https://github.com/apache/kafka/pull/14955


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on PR #14955:
URL: https://github.com/apache/kafka/pull/14955#issuecomment-1877769128

   JDK 8 tests are all successful and other failing tests don't look related


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


divijvaidya merged PR #15125:
URL: https://github.com/apache/kafka/pull/15125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on PR #15125:
URL: https://github.com/apache/kafka/pull/15125#issuecomment-1877746515

   The modified test is successful 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15125/1/testReport/org.apache.kafka.streams.processor.internals/StoreChangelogReaderTest/
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small cleanups in Connect [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on code in PR #15128:
URL: https://github.com/apache/kafka/pull/15128#discussion_r1442237669


##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -119,16 +117,16 @@ private boolean isInternalRequest(ContainerRequestContext 
requestContext) {
 
 public static class BasicAuthCallBackHandler implements CallbackHandler {
 
-private String username;
-private String password;
+private final String username;

Review Comment:
   I have fixed all such "field not being final warnings" in 
https://github.com/apache/kafka/pull/15072
   
   Please check when you get a chance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: New year code clean up - misc [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on code in PR #15071:
URL: https://github.com/apache/kafka/pull/15071#discussion_r1442225284


##
trogdor/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java:
##
@@ -67,8 +67,7 @@ public void set(String name, PartitionsSpec value) {
 }
 
 public TopicsSpec immutableCopy() {
-HashMap mapCopy = new HashMap<>();
-mapCopy.putAll(map);
+HashMap mapCopy = new HashMap<>(map);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: New year code clean up - misc [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on code in PR #15071:
URL: https://github.com/apache/kafka/pull/15071#discussion_r1442224965


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1079,8 +1077,7 @@ private void clearAll() {
 }
 
 private List 
drainPendingCommits() {
-ArrayList res = new 
ArrayList<>();
-
res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));
+ArrayList res = new 
ArrayList<>(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: Add KIP-923 to upgrade-guide.html and dsl-api.html [kafka]

2024-01-04 Thread via GitHub


wcarlson5 commented on PR #14725:
URL: https://github.com/apache/kafka/pull/14725#issuecomment-1877692107

   @mjsax Finally got around to your comments. I think we need to pick this 
back to 3.6 and 3.7 as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: Add KIP-923 to upgrade-guide.html and dsl-api.html [kafka]

2024-01-04 Thread via GitHub


wcarlson5 commented on code in PR #14725:
URL: https://github.com/apache/kafka/pull/14725#discussion_r1442198505


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -2849,6 +2851,12 @@ KTable-KTable 
Foreign-Key
 When the table is versioned,
 the table record to join with is 
determined by performing a timestamped lookup, i.e., the table record which is 
joined will be the latest-by-timestamp record with timestamp
 less than or equal to the stream 
record timestamp. If the stream record timestamp is older than the table's 
history retention, then the record is dropped.
+To use the Grace Period, the table 
needs to be versioned.

Review Comment:
   I could go either way. Lets un cap it for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]

2024-01-04 Thread via GitHub


clolov opened a new pull request, #15131:
URL: https://github.com/apache/kafka/pull/15131

   Adding the public documentation for metrics introduced in 
[KIP-963](https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-04 Thread via GitHub


gharris1727 commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1442146732


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception 
{
 assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
 }
 
+@Test()
+public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+connector = mock(BogusSourceConnector.class);
+expectAdd(SourceSink.SOURCE);
+
+// Start the connector
+Map config = connectorConfig(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);
+expectConfigValidation(connectorMock, true, config);
+
+herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+// Wait on connector to start
+Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+// Updated task with new config
+Map updatedTaskConfig = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig.put("dummy-task-property", "yes");
+when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+.thenReturn(Collections.singletonList(updatedTaskConfig));
+
+// Expect that tasks will be stopped and started again
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+doNothing().when(worker).stopAndAwaitTasks(singletonList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)));
+when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), 
eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+
+// Set new connector config
+Map newConfig = connectorConfig(SourceSink.SOURCE);
+newConfig.put("dummy-task-property", "yes");
+herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
createCallback);

Review Comment:
   This re-uses the createCallback, which should already be resolved from the 
earlier request. I think we need a new future to wait for, like 
testPutConnectorConfig. 
   
   When I made that change, the calback wasn't completing because this test was 
missing a mock for `Worker#startConnector`. There's a mocking idiom used 
elsewhere in the test that you can copy: 
https://github.com/apache/kafka/blob/e6f2624c48ceab032811693e1013b70c6ee16c74/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java#L250-L255
 
   
   After that is fixed, I think some of the other assertions change slightly, 
since the second put isn't actually executing to completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking

2024-01-04 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16084:
---

 Summary: Simplify and deduplicate StandaloneHerderTest mocking
 Key: KAFKA-16084
 URL: https://issues.apache.org/jira/browse/KAFKA-16084
 Project: Kafka
  Issue Type: Test
  Components: connect
Reporter: Greg Harris


The StandaloneHerderTest has some cruft that can be cleaned up. What i've found:

* The `connector` field is written in nearly every test, but only read by one 
test, and looks to be nearly irrelevant.
* `expectConfigValidation` has two ways of specifying consecutive validations. 
1. The boolean shouldCreateConnector which is true in the first invocation and 
false in subsequent invocations. 2. by passing multiple configurations via 
varargs.
* The class uses a mix of Mock annotations and mock(Class) invocations
* The test doesn't stop the thread pool created inside the herder and might 
leak threads
* Mocking for Worker#startConnector is 6 lines which are duplicated 8 times 
throughout the test
* Some waits are 1000 ms and others are 1000 s, and could be pulled out to 
constants or a util method



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-04 Thread via GitHub


ditac opened a new pull request, #15130:
URL: https://github.com/apache/kafka/pull/15130

   When expiring inflight requests, the network client does not take throttle 
time into account. If a connection has multiple inflight requests (default of 
5) and each request is throttled then some of the requests can incorrectly 
marked as expired. Subsequently the connection is closed and the client 
establishes a new connection to the broker. This behavior leads to unnecessary 
connections to the broker, leads to connection storms and increases latencies. 
   
   ### Testing
   Unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Quotav2handler [DRAFT] [kafka]

2024-01-04 Thread via GitHub


nickgarvey closed pull request #15129: Quotav2handler [DRAFT]
URL: https://github.com/apache/kafka/pull/15129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Quotav2handler [DRAFT] [kafka]

2024-01-04 Thread via GitHub


nickgarvey opened a new pull request, #15129:
URL: https://github.com/apache/kafka/pull/15129

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-04 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-16082:

Fix Version/s: 3.7.0

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-04 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-16082:

Fix Version/s: (was: 3.7.0)

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-04 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877542832

   > @iit2009060 You may have mistyped as controller but it does not have any 
role in the fetch path here. Remote fetch is executed through a specific 
purgatory with DelayedRemoteFetch and read operation/callback is executed in a 
specific thread pool. It returns empty records when there is no data for a 
specific offset in the remote storage.
   
   Yes @satishd  correct . I am still going through the code but need few 
inputs 
   1. If we send records empty , How it determine the next offset to be 
fetched. Because in LogOffSetMetadata we always return the fetchOffset request. 
Or it keep requesting the fetchOffSet request ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-04 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877528171

   > Thanks @iit2009060 for the PR.
   > 
   > Let us say there are two segments in remote storage and subsequents 
segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : 
offsets 25 to 30 are compacted. local-seg-31[31, 40]
   > 
   > When a fetch request comes for offsets with in [25, 30] then it should 
move to the local segment as those offsets might have been compacted earlier. 
Did you also cover this scenario in this PR?
   
   @satishd  I am trying to reproduce the case  when last offsets of the 
segment  earlier than the active segment is compacted away. 
   I tried locally but not able to reproduce the above scenario ? 
   There always exist a last offset  in the segment earlier than the active 
segment. 
   As per the article 
   
https://towardsdatascience.com/log-compacted-topics-in-apache-kafka-b1aa1e4665a7
   All records in the active segment are never compacted. 
   @satishd  Do you have a step in mind to regenerate the above scenario ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-04 Thread via GitHub


cadonna commented on PR #15117:
URL: https://github.com/apache/kafka/pull/15117#issuecomment-1877517561

   @lucasbru What do you think about my previous question?
   
   > Should we also add a integration test that tests the scenario?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-04 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803252#comment-17803252
 ] 

Ayoub Omari commented on KAFKA-14404:
-

Hi [~ableegoldman] & [~sujayopensource] 

I see that this ticket is still open since some time now.

Can I take it up ?

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection

2024-01-04 Thread Adithya Chandra (Jira)
Adithya Chandra created KAFKA-16083:
---

 Summary: Exclude throttle time when expiring inflight requests on 
a connection
 Key: KAFKA-16083
 URL: https://issues.apache.org/jira/browse/KAFKA-16083
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Adithya Chandra


When expiring inflight requests, the network client does not take throttle time 
into account. If a connection has multiple inflight requests (default of 5) and 
each request is throttled then some of the requests can incorrectly marked as 
expired. Subsequently the connection is closed and the client establishes a new 
connection to the broker. This behavior leads to unnecessary connections to the 
broker, leads to connection storms and increases latencies. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-04 Thread via GitHub


philipnee commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1877478293

   hi @lucasbru - thanks.  i personally prefer to make consumer interceptor 
thread safe because firstly, (i think) it simplifies the code quite a bit and 
secondly, it is possible to miss the interceptor invocation if user fail to 
poll/close after a commit is sent. latter is a bigger problem i think.  do you 
have an idea of how to fix this?
   
   `When would we not trigger the interceptor in this case?` i think we always 
want to trigger onCommit interceptor if provided. so I think after sending the 
last commit sync on close, we just need to clear the queue, close the network 
thread, and invoke the callbacks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: New year code clean up - misc [kafka]

2024-01-04 Thread via GitHub


mimaison commented on code in PR #15071:
URL: https://github.com/apache/kafka/pull/15071#discussion_r1442027962


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1079,8 +1077,7 @@ private void clearAll() {
 }
 
 private List 
drainPendingCommits() {
-ArrayList res = new 
ArrayList<>();
-
res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));
+ArrayList res = new 
ArrayList<>(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));

Review Comment:
   We can use `List` on the left



##
trogdor/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java:
##
@@ -67,8 +67,7 @@ public void set(String name, PartitionsSpec value) {
 }
 
 public TopicsSpec immutableCopy() {
-HashMap mapCopy = new HashMap<>();
-mapCopy.putAll(map);
+HashMap mapCopy = new HashMap<>(map);

Review Comment:
   Can we use `Map` on the left side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16081) Limit number of ssl connections in brokers

2024-01-04 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-16081:
---
Labels: need-kip  (was: )

> Limit number of ssl connections in brokers
> --
>
> Key: KAFKA-16081
> URL: https://issues.apache.org/jira/browse/KAFKA-16081
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jimmy Wang
>Assignee: Jimmy Wang
>Priority: Major
>  Labels: need-kip
>
> In Kafka, an SSL connection occupies approximately 100KB of memory, while a 
> plaintext connection occupies around 250 bytes, resulting in a memory 
> footprint ratio of approximately 400:1. Therefore, there should be a 
> limitation for SSL connections.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16081 Limit number of ssl connections in brokers [kafka]

2024-01-04 Thread via GitHub


mimaison commented on PR #15126:
URL: https://github.com/apache/kafka/pull/15126#issuecomment-1877414085

   Thanks for the PR.
   
   Since this is adding new configurations, this change requires a KIP to be 
voted by the community. See the [Kafka Improvement 
Proposals](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)
 page on the wiki for details about the KIP process.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Small cleanups in Connect [kafka]

2024-01-04 Thread via GitHub


mimaison opened a new pull request, #15128:
URL: https://github.com/apache/kafka/pull/15128

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441959148


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   > I did however deduplicate the implementation classes by adding 
ConfigProviderUtils class with the common methods.
   
   Rather than making these static utilities and have the caller store the 
List, you can keep the List as an instance variable in the shared 
class. This ensures that the List is handled completely by the validation 
logic and not changed by the config provider implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]

2024-01-04 Thread via GitHub


mimaison commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1877357358

   Sorry for the delay. I tried to build this but it fails in the CI with the 
following error:
   ```
   WorkflowScript: 91: unexpected token: % @ line 91, column 82.
  InMillis() / 1000 / 60 / 60 / % 24
   ```
   See 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14717/6/pipeline


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fixing shadow jar publish [kafka]

2024-01-04 Thread via GitHub


apoorvmittal10 opened a new pull request, #15127:
URL: https://github.com/apache/kafka/pull/15127

   ## What
   The PR fixes the publishing of `kafka-clients` artifact to remote maven. The 
`kafka-clients` jar is recently been shadowed which does publish the artifacts 
to local maven successfully but errors when publishing to remote maven. The 
issue triggers only with `publishMavenJavaPublicationToMavenRepository` and 
`signing` but `publishToMavenLocal` works correctly with earlier changes. 
Generating signed `asc` files error out for shadowed release artifacts as the 
module name (`clients`) differs from the artifact name (`kafka-clients`).
   
   The fix is basically to explicitly define `artifact` of `shadowJar` to 
`signing` and `publish` plugin. `project.shadow.component(mavenJava)` 
previously outputs the name as `client--all.jar` though the 
`classifier` and `archivesBaseName` are already defined correctly in `:clients` 
and `shadowJar` construction. 
   
   Below is another way where the artifact details can be overridden post 
construction in `afterEvaluate` to let `publish` plugin know about the correct 
`artifactId` but it doesn't solve the `signing` plugin issue where `signing` 
plugin is unaware of `kafka-client-.jar` and skips signing. For 
`signing` plugin the `artifact` should be explicitly defined as well.
   
   ```
   project.shadow.component(mavenJava)
   
   afterEvaluate {
 pom.withXml {
   asNode().dependencies.dependency.findAll() {
 def projectName = it.artifactId.text()
 if (['clients'].contains(projectName)) {
   it.artifactId*.value = 'kafka-clients'
 }
   }
 }
   }
   ```  
   
   ## Tests:
   - I created person `gpg` secrets locally for signing and private maven repo 
to upload the artifacts. I ran the release pipeline pointing to custom repo and 
can successfully push the `kafka-clients` artifacts.
   
   ![Screenshot 2024-01-04 at 3 46 06 
PM](https://github.com/apache/kafka/assets/2861565/e8f71eb9-905f-4838-afa1-99929d613e2d)
   
   - Testing shaded dependencies generated successfully. I can see no 
difference in the previous and release shaded jars, and can also see shaded 
dependencies in the jar.
   
   ![Screenshot 2024-01-04 at 3 50 11 
PM](https://github.com/apache/kafka/assets/2861565/e334e51c-ec00-419a-97a1-62c0315bfefe)
   
   ## Query:
   
   - I compared the 3.6.1 kafka release artifacts 
[here](https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.6.1/) 
with generated artifacts in my personal repo, and can find `module` metadata 
artifact is missing. Debugged the issue and it seems `gradle publish` - `from 
components.java` actually registers both `jar` and `module` for publish which 
is not there in case of `shadowJar` hence the question is that do we require to 
publish `module` metadata information or are we good?
   
   cc: @stanislavkozlovski @xvrl @AndrewJSchofield
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-04 Thread via GitHub


satishd commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1877333095

   @iit2009060 You may have mistyped as controller but it does not have any 
role in the fetch path here. 
   Remote fetch is executed through a specific purgatory with 
DelayedRemoteFetch and read operation/callback is executed in a specific thread 
pool. It returns empty records when there is no data for a specific offset in 
the remote storage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441931015


##
clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigProviderUtils {
+
+/**
+ * Reads the given {@code configValue} and creates a list of paths.
+ * @param configValue allowed.paths config value which is a string 
containing comma separated list of paths
+ * @return List of paths or null if the {@code configValue} is null or 
empty string.
+ */
+public static List configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));

Review Comment:
   You still need to normalize them to remove `.` and `..` in the middle of the 
path (for example `/tmp/../path`) but I think it would be preferable to ensure 
allowed paths are absolute. Should we also ensure they exist or are there use 
cases where the paths might be created later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16003: Always create the /config/topics ZNode even for topics w… [kafka]

2024-01-04 Thread via GitHub


mimaison commented on PR #15022:
URL: https://github.com/apache/kafka/pull/15022#issuecomment-1877305934

   @mumrah can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-04 Thread via GitHub


developster commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1877282686

   @gharris1727 , I've added the test. It first starts the connector, then 
updates both the connector and task at the same time. The reason for this is 
that task reconfiguration will skip if the connector is not yet started. Hope 
it looks right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-04 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803205#comment-17803205
 ] 

Gaurav Narula commented on KAFKA-16082:
---

We tried to analyse the failure scenarios further, keeping the existing design 
for inter-broker replica movement and here's a summary of our findings:

Consider a partition `tp0` being moved from `dir1` to `dir2`. To recap, the 
current design

(i) Waits for the future replica (in `dir2`) to catch up
(ii) Sends an RPC to the controller to mark `dir2` as the log dir for the 
partition.
(iii) On getting a successful response back from the controller, we wait for 
the future replica to catch up once again. Note that we hold locks when the 
compare LEOs for current and future replicas.
(iv) When caught up, we promote the future replica by renaming the directory in 
`dir2` atomically and get rid of the `-future` suffix. We also update broker 
local caches to denote that the partition resides in `dir2` and no future 
replica exists. Finally, we atomically rename the directory in `dir1` and add a 
`-delete` suffix for it to be cleaned later.

Let's consider the following failure categories:

1. Log directory failure during (iv)

This can further be broken down into two scenarios:

(a) `dir2` fails

This would result in an atomic rename of directory in `dir2` to fail and a 
KafkaStorageException to be propagated up in ReplicaAlterLogDirsThread and the 
thread to abort. Eventually,  `ReplicaManager::maybeUpdateTopicAssignment` will 
be run while handling the metadata update from the controller. The function 
will correct the assignment to `dir1`.

(b) `dir1`  fails

This would result in an atomic rename of directory in `dir1` to fail and a 
KafkaStorageException to be propagates up in ReplicaAlterLogDirsThread. Since 
the renaming of future replica and the caches are up to date, 
`ReplicaManager::maybeUpdateTopicAssignment` will be a no-op. However, when the 
broker is restarted, it will fail during startup as two log dirs will exist for 
the partition in `dir1` and `dir2`. The error message is clear here in 
suggesting the partition must be removed from the directory which failed 
recently  (`dir1`)

2. Log directory failure during (iii)

This would result in `replicaAlterLogDirsManager.removeFetcherForPartitions` 
being invoked. Eventually,  `ReplicaManager::maybeUpdateTopicAssignment` will 
be run while handling the metadata update from the controller. The function 
will correct the assignment to `dir1`.

3. Broker crashes during (iii) and starts with an empty `dir1`

Broker will catch up with the metadata from the controller, realises `dir2` 
should own `tp0`. It ignores the failed future replica in `dir2` and creates a 
new future replica in `dir2`, streaming the logs from the new leader. This is 
safe as long as the new leader was in-sync prior to getting elected.

What we overlooked earlier was the fact that 
`ReplicaManager::maybeUpdateTopicAssignment` tries to reconcile the broker's 
state with the controller, causing it to converge eventually. So far, we're 
unable to come up with a scenario with data loss. The bug we have so far is the 
failure to remove an abandoned future directory in scenario (3) which seems to 
be more benign.

I'm curious to hear what others think about these scenarios and possibly others 
that they come across? Perhaps someone who's worked closely in Partion.scala 
can pitch in?

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-04 Thread via GitHub


lucasbru commented on code in PR #15117:
URL: https://github.com/apache/kafka/pull/15117#discussion_r1441867884


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
 .inState(State.RESTORING)
 .withInputPartitions(taskId03Partitions).build();
+final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId04Partitions).build();

Review Comment:
   Right, I only fixed the other one. I think I recently changed my IDE to use 
8 character continuation indentation, became somebody complained about it in a 
client PR. Changing it back. Honestly, I wish we'd just have automatic 
formatting in kafka. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-04 Thread via GitHub


cadonna commented on code in PR #15117:
URL: https://github.com/apache/kafka/pull/15117#discussion_r1441848969


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
 .inState(State.RESTORING)
 .withInputPartitions(taskId03Partitions).build();
+final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId04Partitions).build();

Review Comment:
   I think there was a similar indentation issue on line 1713 that you fixed. I 
also saw one on line 1652.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
 .inState(State.RESTORING)
 .withInputPartitions(taskId03Partitions).build();
+final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId04Partitions).build();

Review Comment:
   This does not seem done. But nevermind, if it is just this I will of course 
not block the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-04 Thread via GitHub


viktorsomogyi commented on PR #15115:
URL: https://github.com/apache/kafka/pull/15115#issuecomment-1877217071

   @mumrah @divijvaidya, the script just edits the PR body, it won't touch the 
commit itself.
   
   I discovered yesterday that this PR also already runs the action, however it 
failed with something (quotation issue). While fixing it, I discovered that by 
default the GH token doesn't permit getting user details (in my experiments on 
my own forks it wasn't an issue because likely it's my own). I worked this out 
in #15123. The run for this is 
https://github.com/apache/kafka/actions/runs/7410908203/job/20164210753?pr=15123.
   
   It says the following:
   ```
   Run user_json=$(gh api -H "Accept: application/vnd.github+json" -H 
"X-GitHub-Api-Version: 
2022-[1](https://github.com/apache/kafka/actions/runs/7410908203/job/20164210753?pr=15123#step:3:1)1-28"
 users/viktorsomogyi)
 
   GraphQL: Resource not accessible by integration (updatePullRequest)
   Error: Process completed with exit code 1.
   ```
   The first line shouldn't deceive you, it's just the first line of the 
script. I think the error is that pull requests from forks doesn't have a write 
token for pull requests. I tried setting that and everything else to write in 
#15123 but according to the logs it doesn't apply.
   Also I found this in the documentation 
([here](https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token)):
   
   > You can use the `permissions` key to add and remove read permissions for 
forked repositories, but typically you can't grant write access. The exception 
to this behavior is where an admin user has selected the Send write tokens to 
workflows from pull requests option in the GitHub Actions settings. For more 
information, see "[Managing GitHub Actions settings for a 
repository](https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/enabling-features-for-your-repository/managing-github-actions-settings-for-a-repository#enabling-workflows-for-private-repository-forks)."
   
   Do you think we can change this? We'll probably need an admin for the repo, 
I'm just a simple member and can't do anything basically. I'll try to work this 
out in my own repo (will ask someone to create PRs against it 😄) and see if I 
can find the exact setting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-01-04 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1877214088

   @philipnee, PTAL, thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-04 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano updated KAFKA-16082:
--
Priority: Blocker  (was: Critical)

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]

2024-01-04 Thread via GitHub


ijuma commented on PR #14868:
URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877205588

   Sounds good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-04 Thread Proven Provenzano (Jira)
Proven Provenzano created KAFKA-16082:
-

 Summary: JBOD: Possible dataloss when moving leader partition
 Key: KAFKA-16082
 URL: https://issues.apache.org/jira/browse/KAFKA-16082
 Project: Kafka
  Issue Type: Bug
  Components: jbod
Affects Versions: 3.7.0
Reporter: Proven Provenzano
Assignee: Gaurav Narula
 Fix For: 3.7.0


There is a possible dataloss scenario

when using JBOD,

when moving the partition leader log from one directory to another on the same 
broker,

when after the destination log has caught up to the source log and after the 
broker has sent an update to the partition assignment

if the broker accepts and commits a new record for the partition and then the 
broker restarts and the original partition leader log is lost

then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]

2024-01-04 Thread via GitHub


mimaison commented on PR #14868:
URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877201295

   Yes I expect we'll be able to remove that when we remove ZooKeeper. In the 
meantime, these PRs are helpful to move the commands to the tools module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-04 Thread via GitHub


lucasbru commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1877184050

   @philipnee Good point about auto-commits, I missed that. It's a pity that 
auto-commits aren't triggered from the main thread (I wonder if we could do 
that? Would be another architectural change). 
   
   I would go for solution num 1 because
- Original KIP clearly states that it should only call into the interceptor 
from a single thread, so I think we'd easily break code if we start requiring 
them to be thread-safe. Yeah, you could write a KIP, but not sure if that's 
reasonable here.
- When would we not trigger the interceptor in this case? If you are 
talking about `Consumer.close()`, could we make sure to empty the background 
event queue after closing the background thread, and then run the invoker?
   
   Wdyt?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-04 Thread via GitHub


viktorsomogyi commented on code in PR #15115:
URL: https://github.com/apache/kafka/pull/15115#discussion_r1441813320


##
.github/workflows/pr_reviews.yml:
##
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Adding Reviewers
+
+on:
+  pull_request_review:
+types: [submitted]
+
+jobs:
+  add_reviewers:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Add Reviewers
+  run: |
+user_json=$(gh api users/${{ github.event.review.user.login }})
+user_name=$(echo "$user_json" | jq -r '.name')
+user_email=$(echo "$user_json" | jq -r '.email')
+if [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ 
.*${user_name}.*$ ]]; then
+echo "Reviewer already added: ${user_name} <${user_email}>"
+elif [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*$ 
]]; then
+  pr_body="${{ github.event.pull_request.body }}, ${user_name} 
<${user_email}>"
+  gh pr edit ${{ github.event.pull_request.number }} --body 
"${pr_body}"
+  echo "Added reviewer: ${user_name} <${user_email}>"
+else
+  pr_body="${{ github.event.pull_request.body }}
+
+Reviewers: ${user_name} <${user_email}>"

Review Comment:
   It's intentional. The "Reviews: " part would have an indentation on the PR 
as it literally sets the `pr_body` var as the new PR body. In other words the 
bash string made out of it will contain the indentation (hence the empty line 
as well).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-04 Thread via GitHub


viktorsomogyi commented on code in PR #15115:
URL: https://github.com/apache/kafka/pull/15115#discussion_r1441800052


##
.github/workflows/pr_reviews.yml:
##
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Adding Reviewers
+
+on:
+  pull_request_review:
+types: [submitted]
+
+jobs:
+  add_reviewers:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Add Reviewers
+  run: |
+user_json=$(gh api users/${{ github.event.review.user.login }})

Review Comment:
   Thanks, I'll add these, good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]

2024-01-04 Thread via GitHub


ijuma commented on PR #14868:
URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877150309

   You're right. The weird part is actually the fact that `ConfigCommand` 
relies on this detail instead of using the relevant public apis. I guess it's 
partly due to the remaining zk code. Seems ok to go with this approach for now 
and clean it up once we branch for 4.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16081 Limit number of ssl connections in brokers [kafka]

2024-01-04 Thread via GitHub


JimmyWang6 opened a new pull request, #15126:
URL: https://github.com/apache/kafka/pull/15126

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16081) Limit number of ssl connections in brokers

2024-01-04 Thread Jimmy Wang (Jira)
Jimmy Wang created KAFKA-16081:
--

 Summary: Limit number of ssl connections in brokers
 Key: KAFKA-16081
 URL: https://issues.apache.org/jira/browse/KAFKA-16081
 Project: Kafka
  Issue Type: New Feature
Reporter: Jimmy Wang
Assignee: Jimmy Wang


In Kafka, an SSL connection occupies approximately 100KB of memory, while a 
plaintext connection occupies around 250 bytes, resulting in a memory footprint 
ratio of approximately 400:1. Therefore, there should be a limitation for SSL 
connections.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]

2024-01-04 Thread via GitHub


nizhikov commented on PR #14868:
URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877129273

   @ijuma 
   
   >  Is this used by anything outside of tools?
   
   AFAICS yes.
   
   `DymamicConfig`, `ClientQuotaMetadataManager`, `DynamicConfigPublisher` to 
name a few.
   These classes works inside the broker isn't it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]

2024-01-04 Thread via GitHub


ijuma commented on PR #14868:
URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877112384

   It's a bit weird to move stuff like this to `server-common`. Is this used by 
anything outside of `tools`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address (#14813) [kafka]

2024-01-04 Thread via GitHub


cadonna merged PR #14980:
URL: https://github.com/apache/kafka/pull/14980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address (#14813) [kafka]

2024-01-04 Thread via GitHub


cadonna commented on PR #14980:
URL: https://github.com/apache/kafka/pull/14980#issuecomment-1877111288

   Failed tests are unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ConfigEntityName move to server-common [kafka]

2024-01-04 Thread via GitHub


nizhikov commented on PR #14868:
URL: https://github.com/apache/kafka/pull/14868#issuecomment-1877093884

   Test failure was fixed by #417338ad77d15cad2534d7cff436c8776e717255
   I merged latest trunk to PR so this must be OK now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


clolov commented on code in PR #15125:
URL: https://github.com/apache/kafka/pull/15125#discussion_r1441743303


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -1335,9 +1299,6 @@ public void 
shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() {
 when(standbyStateManager.taskType()).thenReturn(STANDBY);
 final StoreChangelogReader changelogReader = new 
StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, 
standbyListener);
 
when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
-
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
-EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
-EasyMock.replay(store, storeMetadataOne);

Review Comment:
   According to Mockito these were unused



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -223,9 +202,6 @@ public void shouldNotRegisterStoreWithoutMetadata() {
 @Test
 public void shouldSupportUnregisterChangelogBeforeInitialization() {
 setupStateManagerMock();
-final Map mockTasks = mock(Map.class);
-
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
-EasyMock.replay(mockTasks, store);

Review Comment:
   The mockTasks were reported as unused by Mockito



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -307,15 +284,16 @@ public void 
shouldSupportUnregisterChangelogBeforeCompletion() {
 public void shouldSupportUnregisterChangelogAfterCompletion() {
 setupStateManagerMock();
 setupStoreMetadata();
+setupStore();
+@SuppressWarnings("unchecked")

Review Comment:
   This is required because otherwise mocking a Map throws a tantrum. However, 
if there are nicer ways to achieve the same I am more than happy to change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


clolov opened a new pull request, #15125:
URL: https://github.com/apache/kafka/pull/15125

   This pull request takes a similar approach to how TaskManagerTest is being 
migrated to Mockito mock by mock for easier reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-04 Thread via GitHub


kagarwal06 commented on PR #15048:
URL: https://github.com/apache/kafka/pull/15048#issuecomment-1877080323

   > I saw that but the action only allows me to pick a branch or tag. Ideally 
I'd like to run it on the pull request.
   
   Hi @mimaison , thanks for the feedback.
   
   The dockerisation of Apache Kafka follows the following process
   - Accepts a signed Kafka tarball link.
   - Expects the presence of both the `.asc` file and `key` for tarball 
verification. This verification process is implemented in the 
[Dockerfile](https://github.com/apache/kafka/blob/trunk/docker/jvm/Dockerfile#L78-L83).
   
   The `docker_build_test.py`, utilized in GitHub Actions as well, takes an 
Apache Kafka tarball URL, supplies it to the Dockerfile, builds the Docker 
image, and runs tests on it. This approach is valuable during Release Candidate 
(RC) testing, allowing us to verify the Docker image by providing the Apache 
Kafka RC link along with the necessary `.asc` file and the `key`.
   
   However, in the context of pull requests, especially when changes impact the 
Apache Kafka codebase and the docker path (as in the current PR), testing 
requires a signed tarball with the associated `keys` and `.asc` file. 
Presently, our pipeline is not designed for this use-case.
   
   **NOTE:** We have locally tested the above changes by building the Apache 
Kafka and uploading the tarball to S3 and commenting out the tarball 
verification code in the Dockerfile.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-04 Thread via GitHub


lucasbru commented on code in PR #15117:
URL: https://github.com/apache/kafka/pull/15117#discussion_r1441719248


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final 
long now,
 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
 } else if (tasks.removePendingTaskToAddBack(task.id())) {
 stateUpdater.add(task);
+} else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+task.revive();
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));

Review Comment:
   Done



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
 .inState(State.RESTORING)
 .withInputPartitions(taskId03Partitions).build();
+final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId04Partitions).build();

Review Comment:
   Done



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)

Review Comment:
   Done



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() {
 stateUpdater.add(task);
 } else if (tasks.removePendingTaskToCloseClean(task.id())) {
 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
+} else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+task.revive();
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+stateUpdater.add(task);

Review Comment:
   Good point! I was supposed to call `addTaskToStateUpdater` here instead.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+.withInputPartitions(taskId00Partitions)
+.inState(State.RESTORING).build();
+final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+.withInputPartitions(taskId01Partitions)
+.inState(State.RUNNING).build();
+when(stateUpdater.hasRemovedTasks()).thenReturn(true);
+when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01));
+final TasksRegistry tasks = mock(TasksRegistry.class);
+when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
+
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions);
+
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


clolov commented on PR #15116:
URL: https://github.com/apache/kafka/pull/15116#issuecomment-1877054383

   Okay @lucasbru, I will look into it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


lucasbru commented on PR #15116:
URL: https://github.com/apache/kafka/pull/15116#issuecomment-1877052454

   @clolov I do believe that the flaky test @divijvaidya mentioned started with 
another mockito migration PR on 4th of december, #13932. Maybe you could give 
it a look even if it's not related to this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


divijvaidya merged PR #15116:
URL: https://github.com/apache/kafka/pull/15116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on PR #15116:
URL: https://github.com/apache/kafka/pull/15116#issuecomment-1877045252

   My bad! the test failure I notes above is known to be flaky as per 
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=org.apache.kafka.streams.processor.internals.StreamThreadTest&tests.test=shouldNotEnforceRebalanceWhenCurrentlyRebalancing%5B0%5D
 and is not related to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-04 Thread via GitHub


mimaison commented on PR #15048:
URL: https://github.com/apache/kafka/pull/15048#issuecomment-1877012641

   I saw that but the action only allows me to pick a branch or tag. Ideally 
I'd like to run it on the pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-04 Thread via GitHub


divijvaidya commented on PR #15116:
URL: https://github.com/apache/kafka/pull/15116#issuecomment-1876978251

   @clolov we have test failures related to this PR. Can you please fix them: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15116/1/testReport/org.apache.kafka.streams.processor.internals/StreamThreadTest/Build___JDK_11_and_Scala_2_13___shouldNotEnforceRebalanceWhenCurrentlyRebalancing_0_/
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16080) partiton not retention after execute ALTER_REPLICA_LOG_DIRS and LEADER_AND_ISR request at the same time

2024-01-04 Thread wangliucheng (Jira)
wangliucheng created KAFKA-16080:


 Summary: partiton not retention after execute 
ALTER_REPLICA_LOG_DIRS and LEADER_AND_ISR request at the same time
 Key: KAFKA-16080
 URL: https://issues.apache.org/jira/browse/KAFKA-16080
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.2
Reporter: wangliucheng


Hi,
I found a reproducible problem,

when server running a task which is ALTER_REPLICA_LOG_DIRS 
e.g. test-1 /data01/kafka/log/test01-1 -> /data02/kafka/log/test01-1.xxx-future
then 
1) thread task kafka-log-retention not work both /data01/kafka/log/test01-1 and 
/data02/kafka/log/test01-1.xxx-future,
   The result is that the data will not  retention while task is running
   analysis: The kafka-log-retention thread not work on test01-1  after 
invoking logManager.abortAndPauseCleaning(topicPartition)
   
2) If LEADER_AND_ISR is request while ALTER_REPLICA_LOG_DIRS task is running,
   After the task is end,  the data which in /data02/kafka/log/test01-1  will 
not be deleted all the time 
   analysis: logManager.abortAndPauseCleaning(topicPartition) invoked twice, 
but resumed only once

How to optimize this problem
Thanks 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441652042


##
clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigProviderUtils {
+
+/**
+ * Reads the given {@code configValue} and creates a list of paths.
+ * @param configValue allowed.paths config value which is a string 
containing comma separated list of paths
+ * @return List of paths or null if the {@code configValue} is null or 
empty string.
+ */
+public static List configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));

Review Comment:
   So instead of normalising the path, we don't allow any traversal? Should we 
return an exception in that case? Or if some paths contain `..` but some don't, 
do we only configure the ones that are absolute or only configure if all paths 
are absolute? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-04 Thread via GitHub


cadonna commented on code in PR #15117:
URL: https://github.com/apache/kafka/pull/15117#discussion_r1441615070


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
 .inState(State.RESTORING)
 .withInputPartitions(taskId03Partitions).build();
+final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId04Partitions).build();

Review Comment:
   nit:
   ```suggestion
   .inState(State.RESTORING)
   .withInputPartitions(taskId04Partitions).build();
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)

Review Comment:
   nit:
   Could you please use more meaningful names for those task variables? 
Something like `activeTask` and `standbyTask` would already be better.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() {
 stateUpdater.add(task);
 } else if (tasks.removePendingTaskToCloseClean(task.id())) {
 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
+} else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+task.revive();
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+stateUpdater.add(task);

Review Comment:
   Where is the task initialized?
   The task is closed cleanly and the call to `revive()` sets the task to 
`CREATED`. However, the task is never initialized after that.
   



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final 
long now,
 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
 } else if (tasks.removePendingTaskToAddBack(task.id())) {
 stateUpdater.add(task);
+} else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+task.revive();
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));

Review Comment:
   My comment above also applies here.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+.withInputPartitions(taskId00Partitions)
+.inState(State.RESTORING).build();
+final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+.withInputPartitions(taskId01Partitions)
+.inState(State.RUNNING).build();
+when(stateUpdater.hasRemovedTasks()).thenReturn(true);
+when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01));
+final TasksRegistry tasks = mock(TasksRegistry.class);
+when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
+
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions);
+
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions);

Review Comment:
   I thought a standby task can never be added to this pending action.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-04 Thread via GitHub


VedarthConfluent commented on PR #15048:
URL: https://github.com/apache/kafka/pull/15048#issuecomment-1876954982

   @mimaison There is a GitHub Actions Workflow added for just that! You can 
check it out 
[here](https://github.com/apache/kafka/actions/workflows/docker_build_and_test.yml).
 Also there is helpful documentation for building, testing and releasing the 
docker image [here](https://github.com/apache/kafka/blob/trunk/docker/README.md)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16079: fix threads leak threads in LocalLeaderEndPointTest and other tests [kafka]

2024-01-04 Thread via GitHub


showuon commented on code in PR #15122:
URL: https://github.com/apache/kafka/pull/15122#discussion_r1441641932


##
core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala:
##
@@ -50,12 +51,15 @@ import scala.collection.{immutable, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.Random
 
-class ReplicaManagerConcurrencyTest {
+class ReplicaManagerConcurrencyTest extends Logging {
 
   private val time = new MockTime()

Review Comment:
   Nice! Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-04 Thread via GitHub


tinaselenge commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1441637951


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
 }.toSet
   }
 
-  @Test
-  def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
-val topic = "test"
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
 val topicPartition = new TopicPartition(topic, 0)
-val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
-brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-// create brokers
-val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
-this.servers = allServers
-val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
-// create the topic
-TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
-// wait until replica log is created on every broker
-TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
-  "Replicas for topic test not created.")
-// shutdown a broker to make sure the following topic deletion will be 
suspended
-val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
-assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-follower.shutdown()
-// start topic deletion
-adminZkClient.deleteTopic(topic)
 
-// make sure deletion of all of the topic's replicas have been tried
-ensureControllerExists()
-val (controller, controllerId) = getController()
-val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
-TestUtils.waitUntilTrue(() => {
-  val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
-  val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-  allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
-}, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+if (isKRaftTest()) {
+  val topicPartition = new TopicPartition(topic, 0)
+  val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled 
= true)
+  this.brokers = allBrokers
+  val partitionHostingBrokers = allBrokers.filter(b => 
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+  // wait until replica log is created on every broker
+  TestUtils.waitUntilTrue(() => 
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+"Replicas for topic test not created.")
+
+  // shutdown a broker to make sure the following topic deletion will be 
suspended
+  val leaderIdOpt = 
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+  val follower = partitionHostingBrokers.filter(s => s.config.brokerId != 
leaderIdOpt).last
+  follower.shutdown()
+  // start topic deletion
+  admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+  // increase the partition count for topic
+  val props = new Properties()
+  props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+  TestUtils.resource(Admin.create(props)) { adminClient =>
+try {
+  adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+} catch {
+  case _: ExecutionException =>
+}
+  }
 
-// increase the partition count for topic
-val props = new Properties()
-props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(servers))
-TestUtils.resource(Admin.create(props)) { adminClient =>
-  try {
-adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
-  } catch {
-case _: ExecutionException =>
+  // bring back the failed broker
+  follower.startup()
+  TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+} else {
+  val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+  brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+  // create brokers
+  val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+  this.servers = allServers
+  val partitionHostingServers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
+

[jira] [Updated] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated

2024-01-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16077:

Priority: Blocker  (was: Critical)

> Streams fails to close task after restoration when input partitions are 
> updated
> ---
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
>
> There is a race condition in the state updater that can cause the following:
>  # We have an active task in the state updater
>  # We get fenced. We recreate the producer, transactions now uninitialized. 
> We ask the state updater to give back the task, add a pending action to close 
> the task clean once it’s handed back
>  # We get a new assignment with updated input partitions. The task is still 
> owned by the state updater, so we ask the state updater again to hand it back 
> and add a pending action to update its input partition
>  # The task is handed back by the state updater. We update its input 
> partitions but forget to close it clean (pending action was overwritten)
>  # Now the task is in an initialized state, but the underlying producer does 
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
>  Exception caught in process. taskId=1_0, 
> processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: 
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: 
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContext

Re: [PR] KAFKA-12399: Deprecate Log4J Appender [kafka]

2024-01-04 Thread via GitHub


mimaison commented on PR #10244:
URL: https://github.com/apache/kafka/pull/10244#issuecomment-1876911687

   @dongjinleekr Following up on this. If I don't hear back in the next couple 
of weeks I'll try to complete this task (and keep you as co-author)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated

2024-01-04 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16077:
--

Assignee: Lucas Brutschy

> Streams fails to close task after restoration when input partitions are 
> updated
> ---
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>
> There is a race condition in the state updater that can cause the following:
>  # We have an active task in the state updater
>  # We get fenced. We recreate the producer, transactions now uninitialized. 
> We ask the state updater to give back the task, add a pending action to close 
> the task clean once it’s handed back
>  # We get a new assignment with updated input partitions. The task is still 
> owned by the state updater, so we ask the state updater again to hand it back 
> and add a pending action to update its input partition
>  # The task is handed back by the state updater. We update its input 
> partitions but forget to close it clean (pending action was overwritten)
>  # Now the task is in an initialized state, but the underlying producer does 
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
>  Exception caught in process. taskId=1_0, 
> processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: 
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: 
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl

  1   2   >