Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-05-23 Thread via GitHub


soarez merged PR #15697:
URL: https://github.com/apache/kafka/pull/15697


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-05-23 Thread via GitHub


viktorsomogyi commented on PR #15697:
URL: https://github.com/apache/kafka/pull/15697#issuecomment-2127130632

   @soarez thanks for the info. I addressed your comment. Do you have anything 
more to add or are we good to go?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-05-09 Thread via GitHub


soarez commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1593094798


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2466,7 +2467,6 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions 
${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the 
failed log directory $dir.")
 }
 // retrieve the UUID here because logManager.handleLogDirFailure handler 
removes it

Review Comment:
   We should move or remove this comment now that the `uuid` declaration has 
been moved up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-05-07 Thread via GitHub


viktorsomogyi commented on PR #15697:
URL: https://github.com/apache/kafka/pull/15697#issuecomment-2098821932

   @soarez at the end I chose the shortcut regarding detecting leaders before 
shutdown. The reason is complex as the solution that would be required for this 
is complex too.
   So on one part the sequence of events is problematic. First we update the 
`LogManager` and then try to propagate the event to the controller. At this 
point the metadata is stale so I can't use that for reliable information to 
detect whether partitions have leadership or not. A workaround would be to 
subtract the LogManager's data from metadata cache (ie. if there is only a 
single isr replica and that is the current, then we can accept it as offline in 
reality). I don't really feel that it is a robust solution, it could be prone 
to race conditions on the network depending on how requests come from the 
controller as long as it's alive. I think it's more robust to just fail if we 
can't contact the controller.
   The second reason is a bit technical and can be worked around, although 
requires lots of effort. When trying to extract which replica->logdir 
information from `LogManager`, my only available information regarding logdirs 
given by the event is the `Uuid`. Unfortunately `LogManager` doesn't store the 
`Uuid` of an offline dir (and besides I don't think `Uuid` and logdir names 
used consistently across the whole module). This problem can be solved by 
propagating both logdir and `Uuid` in the events or store offline dirs' `Uuid ` 
in `LogManager`. I think the latter is problematic because we can't know the 
point until we should store information about offline dirs as they might never 
come back. The first can be done, although could be a sizeable refactor and 
generally I felt that just choosing the simpler route now could be more robust.
   Let me know if you think we should try it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-18 Thread via GitHub


viktorsomogyi commented on PR #15697:
URL: https://github.com/apache/kafka/pull/15697#issuecomment-2064216189

   Rebased on latest trunk as there were some conflicts. Addressed some of the 
comments but there are 2 things I need to investigate:
   * `LogDirFailureTest` fails in `@AfterAll` likely because an incorrect 
shutdown, perhaps there's a timing issue
   * Check if we can detect if there are any leaders before shutdown
   I'll update on both shortly, hopefully tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-18 Thread via GitHub


viktorsomogyi commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1570907841


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -528,6 +529,10 @@ object KafkaConfig {
 "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
 "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
+  val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully 
communicate to the controller that some log " +
+"directory has failed for longer than this time, and there's at least one 
partition with leadership on that directory, " +

Review Comment:
   I'll do another round with this, there might be a way in `BrokerServer` to 
extract this information using the combination of `MetadataCache`, 
`ReplicaManager` and `LogManager`. I'll update you tomorrow about my findings.



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -327,16 +333,25 @@ class BrokerLifecycleManager(
   private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
 override def run(): Unit = {
   if (offlineDirs.isEmpty) {
-offlineDirs = Set(dir)
+offlineDirs = Map(dir -> false)
   } else {
-offlineDirs = offlineDirs + dir
+offlineDirs += (dir -> false)
   }
   if (registered) {
 scheduleNextCommunicationImmediately()
   }
 }
   }
 
+  private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends 
EventQueue.Event {
+override def run(): Unit = {
+  if (!offlineDirs.getOrElse(offlineDir, false)) {
+error(s"Shutting down because couldn't communicate offline log dirs 
with controllers")

Review Comment:
   I'll print the UUID here only and I'll modify other log statements to 
contain the UUID so one can pair these log statements when analyzing the logs. 
Printing the dir path here would be a little bit bigger stretch as we currently 
don't propagate it down to this level. Let me know if you think it'd be better 
to print the path here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1565973156


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -870,6 +875,7 @@ object KafkaConfig {
   .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicPolicyClassNameDoc)
   .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, 
AlterConfigPolicyClassNameDoc)
   .define(LogMessageDownConversionEnableProp, BOOLEAN, 
LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, 
LogMessageDownConversionEnableDoc)
+  .define(LogDirFailureTimeoutMsProp, LONG, 
Defaults.LOG_DIR_FAILURE_TIMEOUT_MS, atLeast(0), MEDIUM, 
LogDirFailureTimeoutMsDoc)

Review Comment:
   In the KIP the accepted value range is defined as >= 1. I wonder if values 
below 1s actually make much sense. 
   Also the importance was defined as low.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-12 Thread via GitHub


soarez commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1562612304


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -507,6 +522,7 @@ class BrokerLifecycleManager(
 if (errorCode == Errors.NONE) {
   val responseData = message.data()
   failedAttempts = 0
+  offlineDirs = offlineDirs.map(kv => kv._1 -> true)

Review Comment:
   I think this is incorrect. If a new failed directory is added to 
`offlineDirs` in-between a hearbeat request-resopnse, then we'll clear it here 
before knowing if it will propagated to the controller.
   
   One idea is to hand down the offline dirs set in the request in 
`sendBrokerHeartBeat()` to `BrokerHeartbeatResponseEvent` through 
`BrokerHeartbeatResponseHandler` as a new constructor argument.



##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -94,6 +94,7 @@ public class Defaults {
 public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 
6;
 public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
 public static final boolean AUTO_CREATE_TOPICS_ENABLE = true;
+public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 3L;

Review Comment:
   This default seems reasonable to me.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -211,7 +211,8 @@ class BrokerServer(
 time,
 s"broker-${config.nodeId}-",
 isZkBroker = false,
-logDirs = logManager.directoryIdsSet)
+logDirs = logManager.directoryIdsSet,
+() => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1))

Review Comment:
   There's a `scheduleOnce` alternative which sets `periodMs` to `-1`.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -528,6 +529,10 @@ object KafkaConfig {
 "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
 "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
+  val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully 
communicate to the controller that some log " +
+"directory has failed for longer than this time, and there's at least one 
partition with leadership on that directory, " +

Review Comment:
   > and there's at least one partition with leadership
   
   We aren't checking for this condition. We can either a) implement it; or b) 
keep it simple and drop this out of the configuration description.



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -327,16 +333,25 @@ class BrokerLifecycleManager(
   private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
 override def run(): Unit = {
   if (offlineDirs.isEmpty) {
-offlineDirs = Set(dir)
+offlineDirs = Map(dir -> false)
   } else {
-offlineDirs = offlineDirs + dir
+offlineDirs += (dir -> false)
   }
   if (registered) {
 scheduleNextCommunicationImmediately()
   }
 }
   }
 
+  private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends 
EventQueue.Event {
+override def run(): Unit = {
+  if (!offlineDirs.getOrElse(offlineDir, false)) {
+error(s"Shutting down because couldn't communicate offline log dirs 
with controllers")

Review Comment:
   We should include the directory in the error. It might also be helpful to 
resolve the directory ID to its path. Perhaps something like `dirIdToPath` in 
`AssignmentsManager` should be made available here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-11 Thread via GitHub


viktorsomogyi commented on PR #15697:
URL: https://github.com/apache/kafka/pull/15697#issuecomment-2049303060

   Rebased it due to conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org