Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-30 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1411139366


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -1048,20 +1054,15 @@ class KafkaServer(
*
* @return The brokerId.
*/
-  private def getOrGenerateBrokerId(brokerMetadata: RawMetaProperties): Int = {
-val brokerId = config.brokerId
-
-if (brokerId >= 0 && brokerMetadata.brokerId.exists(_ != brokerId))
-  throw new InconsistentBrokerIdException(

Review Comment:
   It just throws a `RuntimException` now, I believe. I don't think it's 
necessary to have a custom exception for every `meta.properties` issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1398324375


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -1048,20 +1054,15 @@ class KafkaServer(
*
* @return The brokerId.
*/
-  private def getOrGenerateBrokerId(brokerMetadata: RawMetaProperties): Int = {
-val brokerId = config.brokerId
-
-if (brokerId >= 0 && brokerMetadata.brokerId.exists(_ != brokerId))
-  throw new InconsistentBrokerIdException(

Review Comment:
   @cmccabe There is a test that's meant to catch this exception, but it was 
buggy: 
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala#L137
   
   What's the expected behavior now and should the code or test be changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-09 Thread via GitHub


cmccabe merged PR #14628:
URL: https://github.com/apache/kafka/pull/14628


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-08 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1386983421


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -270,10 +271,31 @@ class KafkaServer(
 
 logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
+// Make sure all storage directories have meta.properties files.
+val metaPropsEnsemble = {
+  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
+  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
+val logDir = e.getKey
+val builder = new MetaProperties.Builder(e.getValue).
+  setClusterId(_clusterId).
+  setNodeId(config.brokerId)
+if (!builder.directoryId().isPresent()) {
+  builder.setDirectoryId(copier.generateValidDirectoryId())
+}
+copier.setLogDirProps(logDir, builder.build())
+  })
+  copier.emptyLogDirs().clear()
+  copier.writeLogDirChanges((logDir, e) => {

Review Comment:
   I added a boolean that indicates the director should be created if it 
doesn't exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-08 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1386175030


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:
##
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/**
+ * A collection of meta.properties information for Kafka log directories.
+ *
+ * Directories are categorized into empty, error, and normal. Each directory 
must appear in only
+ * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps.
+ *
+ * This class is immutable. Modified copies can be made with the Copier class.
+ */
+public final class MetaPropertiesEnsemble {
+/**
+ * The log4j object for this class.
+ */
+public static final Logger LOG = 
LoggerFactory.getLogger(MetaPropertiesEnsemble.class);
+
+/**
+ * A completely empty MetaPropertiesEnsemble object.
+ */
+public static final MetaPropertiesEnsemble EMPTY = new 
MetaPropertiesEnsemble(Collections.emptySet(),
+Collections.emptySet(),
+Collections.emptyMap(),
+Optional.empty());
+
+/**
+ * The name of the meta.properties file within each log directory.
+ */
+public static final String META_PROPERTIES_NAME = "meta.properties";
+
+/**
+ * The set of log dirs that were empty.
+ */
+private final Set emptyLogDirs;
+
+/**
+ * The set of log dirs that had errors.
+ */
+private final Set errorLogDirs;
+
+/**
+ * A map from log directories to the meta.properties information inside 
each one.
+ */
+private final Map logDirProps;
+
+/**
+ * The metadata log directory, or the empty string if there is none.
+ */
+private final Optional metadataLogDir;
+
+/**
+ * Utility class for loading a MetaPropertiesEnsemble from the disk.
+ */
+public static class Loader {
+private final TreeSet logDirs = new TreeSet<>();
+private Optional metadataLogDir = Optional.empty();
+
+public Loader addLogDirs(Collection logDirs) {
+for (String logDir : logDirs) {
+this.logDirs.add(logDir);
+}
+return this;
+}
+
+public Loader addLogDir(String logDir) {
+this.logDirs.add(logDir);
+return this;
+}
+
+public Loader addMetadataLogDir(String metadataLogDir) {
+if (this.metadataLogDir.isPresent()) {
+throw new RuntimeException("Cannot specify more than one 
metadata log directory. " +
+"Already specified " + this.metadataLogDir.get());
+}
+this.metadataLogDir = Optional.of(metadataLogDir);
+logDirs.add(metadataLogDir);
+return this;
+}
+
+public MetaPropertiesEnsemble load() throws IOException  {
+if (logDirs.isEmpty()) {
+throw new RuntimeException("You must specify at least one log 
directory.");
+}
+Set emptyLogDirs = new HashSet<>();
+Set errorLogDirs = new HashSet<>();
+Map logDirProps = new HashMap<>();
+for (String logDir : logDirs) {
+String metaPropsFile = new File(logDir, 
META_PROPERTIES_NAME).getAbsolutePath();
+try {
+  

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-08 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1386175030


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:
##
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/**
+ * A collection of meta.properties information for Kafka log directories.
+ *
+ * Directories are categorized into empty, error, and normal. Each directory 
must appear in only
+ * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps.
+ *
+ * This class is immutable. Modified copies can be made with the Copier class.
+ */
+public final class MetaPropertiesEnsemble {
+/**
+ * The log4j object for this class.
+ */
+public static final Logger LOG = 
LoggerFactory.getLogger(MetaPropertiesEnsemble.class);
+
+/**
+ * A completely empty MetaPropertiesEnsemble object.
+ */
+public static final MetaPropertiesEnsemble EMPTY = new 
MetaPropertiesEnsemble(Collections.emptySet(),
+Collections.emptySet(),
+Collections.emptyMap(),
+Optional.empty());
+
+/**
+ * The name of the meta.properties file within each log directory.
+ */
+public static final String META_PROPERTIES_NAME = "meta.properties";
+
+/**
+ * The set of log dirs that were empty.
+ */
+private final Set emptyLogDirs;
+
+/**
+ * The set of log dirs that had errors.
+ */
+private final Set errorLogDirs;
+
+/**
+ * A map from log directories to the meta.properties information inside 
each one.
+ */
+private final Map logDirProps;
+
+/**
+ * The metadata log directory, or the empty string if there is none.
+ */
+private final Optional metadataLogDir;
+
+/**
+ * Utility class for loading a MetaPropertiesEnsemble from the disk.
+ */
+public static class Loader {
+private final TreeSet logDirs = new TreeSet<>();
+private Optional metadataLogDir = Optional.empty();
+
+public Loader addLogDirs(Collection logDirs) {
+for (String logDir : logDirs) {
+this.logDirs.add(logDir);
+}
+return this;
+}
+
+public Loader addLogDir(String logDir) {
+this.logDirs.add(logDir);
+return this;
+}
+
+public Loader addMetadataLogDir(String metadataLogDir) {
+if (this.metadataLogDir.isPresent()) {
+throw new RuntimeException("Cannot specify more than one 
metadata log directory. " +
+"Already specified " + this.metadataLogDir.get());
+}
+this.metadataLogDir = Optional.of(metadataLogDir);
+logDirs.add(metadataLogDir);
+return this;
+}
+
+public MetaPropertiesEnsemble load() throws IOException  {
+if (logDirs.isEmpty()) {
+throw new RuntimeException("You must specify at least one log 
directory.");
+}
+Set emptyLogDirs = new HashSet<>();
+Set errorLogDirs = new HashSet<>();
+Map logDirProps = new HashMap<>();
+for (String logDir : logDirs) {
+String metaPropsFile = new File(logDir, 
META_PROPERTIES_NAME).getAbsolutePath();
+try {
+  

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-08 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1386172972


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+/**
+ * The version of a meta.properties file.
+ */
+public enum MetaPropertiesVersion {
+V0(0),
+V1(1);
+
+private final int number;
+
+public static MetaPropertiesVersion fromNumberString(String numberString) {
+int number;
+try {
+number = Integer.parseInt(numberString.trim());
+} catch (NumberFormatException  e) {
+throw new RuntimeException("Invalid meta.properties version string 
'" +
+numberString + "'");
+}
+return fromNumber(number);
+}
+
+public static MetaPropertiesVersion fromNumber(int number) {
+switch (number) {
+case 0: return V0;
+case 1: return V1;
+default: throw new RuntimeException("Unknown meta.properties 
version number " + number);
+}
+}
+
+MetaPropertiesVersion(int number) {
+this.number = number;
+}
+
+public int number() {
+return number;
+}
+
+public String numberString() {
+return "" + number;

Review Comment:
   I was just lazy. `"" + something` always works, and doesn't require you to 
know the type of "something"
   
   `Integer.toString` is probably better (although I'm not 100% sure), so I'll 
change it to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-08 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1386171452


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+/**
+ * The version of a meta.properties file.
+ */
+public enum MetaPropertiesVersion {
+V0(0),

Review Comment:
   Good point. Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1385811376


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -270,10 +271,31 @@ class KafkaServer(
 
 logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
+// Make sure all storage directories have meta.properties files.
+val metaPropsEnsemble = {
+  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
+  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
+val logDir = e.getKey
+val builder = new MetaProperties.Builder(e.getValue).
+  setClusterId(_clusterId).
+  setNodeId(config.brokerId)
+if (!builder.directoryId().isPresent()) {
+  builder.setDirectoryId(copier.generateValidDirectoryId())
+}
+copier.setLogDirProps(logDir, builder.build())
+  })
+  copier.emptyLogDirs().clear()
+  copier.writeLogDirChanges((logDir, e) => {

Review Comment:
   Need to create the directory if it doesn't exist. Here is some code I took 
from LogManager.
   
   ```
   diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
   index 9843204dc1..48eee053ce 100755
   --- a/core/src/main/scala/kafka/server/KafkaServer.scala
   +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
   @@ -62,7 +62,7 @@ import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.zookeeper.client.ZKClientConfig

   -import java.io.IOException
   +import java.io._
import java.net.{InetAddress, SocketTimeoutException}
import java.util
import java.util.{Optional, OptionalInt, OptionalLong}
   @@ -276,13 +276,28 @@ class KafkaServer(
  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
val logDir = e.getKey
   +
   +val dir = new File(logDir).getAbsoluteFile
val builder = new MetaProperties.Builder(e.getValue).
  setClusterId(_clusterId).
  setNodeId(config.brokerId)
   -if (!builder.directoryId().isPresent()) {
   -  builder.setDirectoryId(copier.generateValidDirectoryId())
   +
   +try {
   +  if (!builder.directoryId().isPresent()) {
   +if (!dir.exists) {
   +  info(s"Log directory ${dir.getAbsolutePath} not found, 
creating it.")
   +  val created = dir.mkdirs()
   +  if (!created)
   +throw new IOException(s"Failed to create data directory 
${dir.getAbsolutePath}")
   +  
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
   +}
   +builder.setDirectoryId(copier.generateValidDirectoryId())
   +  }
   +  copier.setLogDirProps(logDir, builder.build())
   +} catch {
   +  case e: IOException =>
   +info(s"Failed to create or validate data directory 
${dir.getAbsolutePath}", e)
}
   -copier.setLogDirProps(logDir, builder.build())
  })
  copier.emptyLogDirs().clear()
  copier.writeLogDirChanges((logDir, e) => {
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1385811376


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -270,10 +271,31 @@ class KafkaServer(
 
 logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
+// Make sure all storage directories have meta.properties files.
+val metaPropsEnsemble = {
+  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
+  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
+val logDir = e.getKey
+val builder = new MetaProperties.Builder(e.getValue).
+  setClusterId(_clusterId).
+  setNodeId(config.brokerId)
+if (!builder.directoryId().isPresent()) {
+  builder.setDirectoryId(copier.generateValidDirectoryId())
+}
+copier.setLogDirProps(logDir, builder.build())
+  })
+  copier.emptyLogDirs().clear()
+  copier.writeLogDirChanges((logDir, e) => {

Review Comment:
   Need to create the directory if it doesn't exist. Here is some code I took 
from LogManager.
   
   diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
   index 9843204dc1..48eee053ce 100755
   --- a/core/src/main/scala/kafka/server/KafkaServer.scala
   +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
   @@ -62,7 +62,7 @@ import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.zookeeper.client.ZKClientConfig

   -import java.io.IOException
   +import java.io._
import java.net.{InetAddress, SocketTimeoutException}
import java.util
import java.util.{Optional, OptionalInt, OptionalLong}
   @@ -276,13 +276,28 @@ class KafkaServer(
  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
val logDir = e.getKey
   +
   +val dir = new File(logDir).getAbsoluteFile
val builder = new MetaProperties.Builder(e.getValue).
  setClusterId(_clusterId).
  setNodeId(config.brokerId)
   -if (!builder.directoryId().isPresent()) {
   -  builder.setDirectoryId(copier.generateValidDirectoryId())
   +
   +try {
   +  if (!builder.directoryId().isPresent()) {
   +if (!dir.exists) {
   +  info(s"Log directory ${dir.getAbsolutePath} not found, 
creating it.")
   +  val created = dir.mkdirs()
   +  if (!created)
   +throw new IOException(s"Failed to create data directory 
${dir.getAbsolutePath}")
   +  
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
   +}
   +builder.setDirectoryId(copier.generateValidDirectoryId())
   +  }
   +  copier.setLogDirProps(logDir, builder.build())
   +} catch {
   +  case e: IOException =>
   +info(s"Failed to create or validate data directory 
${dir.getAbsolutePath}", e)
}
   -copier.setLogDirProps(logDir, builder.build())
  })
  copier.emptyLogDirs().clear()
  copier.writeLogDirChanges((logDir, e) => {
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1385810018


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -270,10 +271,31 @@ class KafkaServer(
 
 logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
+// Make sure all storage directories have meta.properties files.
+val metaPropsEnsemble = {
+  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
+  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
+val logDir = e.getKey
+val builder = new MetaProperties.Builder(e.getValue).
+  setClusterId(_clusterId).
+  setNodeId(config.brokerId)
+if (!builder.directoryId().isPresent()) {
+  builder.setDirectoryId(copier.generateValidDirectoryId())
+}
+copier.setLogDirProps(logDir, builder.build())
+  })
+  copier.emptyLogDirs().clear()
+  copier.writeLogDirChanges((logDir, e) => {
+logDirFailureChannel.maybeAddOfflineLogDir(logDir, s"Error while 
writing meta.properties to $logDir", e)
+  })
+  copier.copy()
+}
+metaPropsEnsemble.verify(Optional.of(_clusterId), 
OptionalInt.of(config.brokerId), verificationFlags)

Review Comment:
   Need to create the directory if it doesn't exist here. Here is some code 
that I basicly took from LogManager.scala
   
   `diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
   index 9843204dc1..48eee053ce 100755
   --- a/core/src/main/scala/kafka/server/KafkaServer.scala
   +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
   @@ -62,7 +62,7 @@ import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.zookeeper.client.ZKClientConfig

   -import java.io.IOException
   +import java.io._
import java.net.{InetAddress, SocketTimeoutException}
import java.util
import java.util.{Optional, OptionalInt, OptionalLong}
   @@ -276,13 +276,28 @@ class KafkaServer(
  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
val logDir = e.getKey
   +
   +val dir = new File(logDir).getAbsoluteFile
val builder = new MetaProperties.Builder(e.getValue).
  setClusterId(_clusterId).
  setNodeId(config.brokerId)
   -if (!builder.directoryId().isPresent()) {
   -  builder.setDirectoryId(copier.generateValidDirectoryId())
   +
   +try {
   +  if (!builder.directoryId().isPresent()) {
   +if (!dir.exists) {
   +  info(s"Log directory ${dir.getAbsolutePath} not found, 
creating it.")
   +  val created = dir.mkdirs()
   +  if (!created)
   +throw new IOException(s"Failed to create data directory 
${dir.getAbsolutePath}")
   +  
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
   +}
   +builder.setDirectoryId(copier.generateValidDirectoryId())
   +  }
   +  copier.setLogDirProps(logDir, builder.build())
   +} catch {
   +  case e: IOException =>
   +info(s"Failed to create or validate data directory 
${dir.getAbsolutePath}", e)
}
   -copier.setLogDirProps(logDir, builder.build())
  })
  copier.emptyLogDirs().clear()
  copier.writeLogDirChanges((logDir, e) => {
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1385810018


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -270,10 +271,31 @@ class KafkaServer(
 
 logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
+// Make sure all storage directories have meta.properties files.
+val metaPropsEnsemble = {
+  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
+  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
+val logDir = e.getKey
+val builder = new MetaProperties.Builder(e.getValue).
+  setClusterId(_clusterId).
+  setNodeId(config.brokerId)
+if (!builder.directoryId().isPresent()) {
+  builder.setDirectoryId(copier.generateValidDirectoryId())
+}
+copier.setLogDirProps(logDir, builder.build())
+  })
+  copier.emptyLogDirs().clear()
+  copier.writeLogDirChanges((logDir, e) => {
+logDirFailureChannel.maybeAddOfflineLogDir(logDir, s"Error while 
writing meta.properties to $logDir", e)
+  })
+  copier.copy()
+}
+metaPropsEnsemble.verify(Optional.of(_clusterId), 
OptionalInt.of(config.brokerId), verificationFlags)

Review Comment:
   Need to create the directory if it doesn't exist here. Here is some code 
that I basicly took from LogManager.scala
   
   `diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
   index 9843204dc1..48eee053ce 100755
   --- a/core/src/main/scala/kafka/server/KafkaServer.scala
   +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
   @@ -62,7 +62,7 @@ import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.zookeeper.client.ZKClientConfig

   -import java.io.IOException
   +import java.io._
import java.net.{InetAddress, SocketTimeoutException}
import java.util
import java.util.{Optional, OptionalInt, OptionalLong}
   @@ -276,13 +276,28 @@ class KafkaServer(
  val copier = new 
MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
  
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
val logDir = e.getKey
   +
   +val dir = new File(logDir).getAbsoluteFile
val builder = new MetaProperties.Builder(e.getValue).
  setClusterId(_clusterId).
  setNodeId(config.brokerId)
   -if (!builder.directoryId().isPresent()) {
   -  builder.setDirectoryId(copier.generateValidDirectoryId())
   +
   +try {
   +  if (!builder.directoryId().isPresent()) {
   +if (!dir.exists) {
   +  info(s"Log directory ${dir.getAbsolutePath} not found, 
creating it.")
   +  val created = dir.mkdirs()
   +  if (!created)
   +throw new IOException(s"Failed to create data directory 
${dir.getAbsolutePath}")
   +  
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
   +}
   +builder.setDirectoryId(copier.generateValidDirectoryId())
   +  }
   +  copier.setLogDirProps(logDir, builder.build())
   +} catch {
   +  case e: IOException =>
   +info(s"Failed to create or validate data directory 
${dir.getAbsolutePath}", e)
}
   -copier.setLogDirProps(logDir, builder.build())
  })
  copier.emptyLogDirs().clear()
  copier.writeLogDirChanges((logDir, e) => {
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1385805130


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:
##
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/**
+ * A collection of meta.properties information for Kafka log directories.
+ *
+ * Directories are categorized into empty, error, and normal. Each directory 
must appear in only
+ * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps.
+ *
+ * This class is immutable. Modified copies can be made with the Copier class.
+ */
+public final class MetaPropertiesEnsemble {
+/**
+ * The log4j object for this class.
+ */
+public static final Logger LOG = 
LoggerFactory.getLogger(MetaPropertiesEnsemble.class);
+
+/**
+ * A completely empty MetaPropertiesEnsemble object.
+ */
+public static final MetaPropertiesEnsemble EMPTY = new 
MetaPropertiesEnsemble(Collections.emptySet(),
+Collections.emptySet(),
+Collections.emptyMap(),
+Optional.empty());
+
+/**
+ * The name of the meta.properties file within each log directory.
+ */
+public static final String META_PROPERTIES_NAME = "meta.properties";
+
+/**
+ * The set of log dirs that were empty.
+ */
+private final Set emptyLogDirs;
+
+/**
+ * The set of log dirs that had errors.
+ */
+private final Set errorLogDirs;
+
+/**
+ * A map from log directories to the meta.properties information inside 
each one.
+ */
+private final Map logDirProps;
+
+/**
+ * The metadata log directory, or the empty string if there is none.
+ */
+private final Optional metadataLogDir;
+
+/**
+ * Utility class for loading a MetaPropertiesEnsemble from the disk.
+ */
+public static class Loader {
+private final TreeSet logDirs = new TreeSet<>();
+private Optional metadataLogDir = Optional.empty();
+
+public Loader addLogDirs(Collection logDirs) {
+for (String logDir : logDirs) {
+this.logDirs.add(logDir);
+}
+return this;
+}
+
+public Loader addLogDir(String logDir) {
+this.logDirs.add(logDir);
+return this;
+}
+
+public Loader addMetadataLogDir(String metadataLogDir) {
+if (this.metadataLogDir.isPresent()) {
+throw new RuntimeException("Cannot specify more than one 
metadata log directory. " +
+"Already specified " + this.metadataLogDir.get());
+}
+this.metadataLogDir = Optional.of(metadataLogDir);
+logDirs.add(metadataLogDir);
+return this;
+}
+
+public MetaPropertiesEnsemble load() throws IOException  {
+if (logDirs.isEmpty()) {
+throw new RuntimeException("You must specify at least one log 
directory.");
+}
+Set emptyLogDirs = new HashSet<>();
+Set errorLogDirs = new HashSet<>();
+Map logDirProps = new HashMap<>();
+for (String logDir : logDirs) {
+String metaPropsFile = new File(logDir, 
META_PROPERTIES_NAME).getAbsolutePath();
+try 

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


mumrah commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1385786809


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+/**
+ * The version of a meta.properties file.
+ */
+public enum MetaPropertiesVersion {
+V0(0),
+V1(1);
+
+private final int number;
+
+public static MetaPropertiesVersion fromNumberString(String numberString) {
+int number;
+try {
+number = Integer.parseInt(numberString.trim());
+} catch (NumberFormatException  e) {
+throw new RuntimeException("Invalid meta.properties version string 
'" +
+numberString + "'");
+}
+return fromNumber(number);
+}
+
+public static MetaPropertiesVersion fromNumber(int number) {
+switch (number) {
+case 0: return V0;
+case 1: return V1;
+default: throw new RuntimeException("Unknown meta.properties 
version number " + number);
+}
+}
+
+MetaPropertiesVersion(int number) {
+this.number = number;
+}
+
+public int number() {
+return number;
+}
+
+public String numberString() {
+return "" + number;

Review Comment:
   Just curious, why not use `Integer.toString` here? Is this string coercion 
more efficient or something?



##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+/**
+ * The version of a meta.properties file.
+ */
+public enum MetaPropertiesVersion {
+V0(0),

Review Comment:
   Should we document the differences between v0 and v1 here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-07 Thread via GitHub


cmccabe commented on PR #14628:
URL: https://github.com/apache/kafka/pull/14628#issuecomment-1800111262

   >  This PR does not work for ZK. You cannot create the directories in 
LogManager.scala but then write the directoryIds later. The directoryIds must 
be generated either when LogManager scala is first called or the directories 
must be created with the new meta.properties file before LogManager is called.
   
   I moved the meta.properties creation before the creation of 
`LogManager.scala` to avoid this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-06 Thread via GitHub


pprovenzano commented on PR #14628:
URL: https://github.com/apache/kafka/pull/14628#issuecomment-1796843636

   This PR does not work for ZK. You cannot create the directories in 
LogManager.scala but then write the directoryIds later. The directoryIds must 
be generated either when LogManager scala is first called or the directories 
must be created with the new meta.properties file before LogManager is called.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-03 Thread via GitHub


cmccabe commented on PR #14628:
URL: https://github.com/apache/kafka/pull/14628#issuecomment-1793043952

   I updated this based on part 1 (which is now committed) and the other 
changes in trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-31 Thread via GitHub


cmccabe commented on PR #14628:
URL: https://github.com/apache/kafka/pull/14628#issuecomment-1787946848

   This was getting too big, so I split off a part 1 here : 
https://github.com/apache/kafka/pull/14678/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-31 Thread via GitHub


soarez commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1377613028


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File],
   /**
* Retrieves the Uuid for the directory, given its absolute path.
*/
-  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+  def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)
 
   /**
* Determine directory ID for each directory with a meta.properties.
-   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
*/
-  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
-dirs.flatMap { dir =>
+  private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+val result = mutable.HashMap[String, Uuid]()
+dirs.foreach(dir => {
   try {
-val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-metadataCheckpoint.read().map { props =>
-  val rawMetaProperties = new RawMetaProperties(props)
-  val uuid = rawMetaProperties.directoryId match {
-case Some(uuidStr) => Uuid.fromString(uuidStr)
-case None =>
-  val uuid = Uuid.randomUuid()
-  rawMetaProperties.directoryId = uuid.toString
-  metadataCheckpoint.write(rawMetaProperties.props)
-  uuid
-  }
-  dir.getAbsolutePath -> uuid
-}.toMap
+val props = PropertiesUtils.readPropertiesFile(
+  new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
+val metaProps = new MetaProperties.Builder(props).build()
+metaProps.directoryId().ifPresent(directoryId => {
+  result += (dir.getAbsolutePath -> directoryId)
+})

Review Comment:
   Sounds ok to me  



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File],
   /**
* Retrieves the Uuid for the directory, given its absolute path.
*/
-  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+  def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)
 
   /**
* Determine directory ID for each directory with a meta.properties.
-   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
*/
-  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
-dirs.flatMap { dir =>
+  private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+val result = mutable.HashMap[String, Uuid]()
+dirs.foreach(dir => {
   try {
-val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-metadataCheckpoint.read().map { props =>
-  val rawMetaProperties = new RawMetaProperties(props)
-  val uuid = rawMetaProperties.directoryId match {
-case Some(uuidStr) => Uuid.fromString(uuidStr)
-case None =>
-  val uuid = Uuid.randomUuid()
-  rawMetaProperties.directoryId = uuid.toString
-  metadataCheckpoint.write(rawMetaProperties.props)
-  uuid
-  }
-  dir.getAbsolutePath -> uuid
-}.toMap
+val props = PropertiesUtils.readPropertiesFile(
+  new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
+val metaProps = new MetaProperties.Builder(props).build()
+metaProps.directoryId().ifPresent(directoryId => {
+  result += (dir.getAbsolutePath -> directoryId)
+})

Review Comment:
   Sounds ok to me  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-27 Thread via GitHub


divijvaidya commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1374255248


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   Understood. Thank you for answering my questions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -407,36 +414,41 @@ object StorageTool extends Logging {
 if (directories.isEmpty) {
   throw new TerseFailure("No log directories found in the configuration.")
 }
-
-val unformattedDirectories = directories.filter(directory => {
-  if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
-  true
-  } else if (!ignoreFormatted) {
-throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
-  "Use --ignore-formatted to ignore this directory and format the 
others.")
-  } else {
-false
-  }
-})
-if (unformattedDirectories.isEmpty) {
+val loader = new MetaPropertiesEnsemble.Loader()
+directories.foreach(loader.addLogDir(_))
+val metaPropertiesEnsemble = loader.load()
+metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(),
+  util.EnumSet.noneOf(classOf[VerificationFlag]))
+
+val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
+  val firstLogDir = copier.logDirProps().keySet().iterator().next()
+  throw new TerseFailure(s"Log directory ${firstLogDir} directory is 
already formatted. " +
+"Use --ignore-formatted to ignore this directory and format the 
others.")
+}
+if (!copier.errorLogDirs().isEmpty) {
+  val firstLogDir = copier.errorLogDirs().iterator().next()
+  throw new TerseFailure(s"I/O error trying to read log directory 
${firstLogDir}.")
+}
+if (copier.emptyLogDirs().isEmpty) {
   stream.println("All of the log directories are already formatted.")
+} else {
+  copier.emptyLogDirs().forEach(logDir => {
+val newMetaProperties = new MetaProperties.Builder(metaProperties).
+  setDirectoryId(copier.generateValidDirectoryId()).
+  build()
+copier.logDirProps().put(logDir, newMetaProperties)

Review Comment:
   Fair point. I added a setter function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373824387


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File],
   /**
* Retrieves the Uuid for the directory, given its absolute path.
*/
-  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+  def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)
 
   /**
* Determine directory ID for each directory with a meta.properties.
-   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
*/
-  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
-dirs.flatMap { dir =>
+  private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+val result = mutable.HashMap[String, Uuid]()
+dirs.foreach(dir => {
   try {
-val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-metadataCheckpoint.read().map { props =>
-  val rawMetaProperties = new RawMetaProperties(props)
-  val uuid = rawMetaProperties.directoryId match {
-case Some(uuidStr) => Uuid.fromString(uuidStr)
-case None =>
-  val uuid = Uuid.randomUuid()
-  rawMetaProperties.directoryId = uuid.toString
-  metadataCheckpoint.write(rawMetaProperties.props)
-  uuid
-  }
-  dir.getAbsolutePath -> uuid
-}.toMap
+val props = PropertiesUtils.readPropertiesFile(
+  new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
+val metaProps = new MetaProperties.Builder(props).build()
+metaProps.directoryId().ifPresent(directoryId => {
+  result += (dir.getAbsolutePath -> directoryId)
+})

Review Comment:
   Yes, I agree it should have a directory ID by this point, during the course 
of normal operation. By handling the no-ID case, I was trying to avoid unit 
tests failing. (To be clear, I haven't tested if they do fail, but I thought 
they might.) 
   
   Maybe we could remove this as a follow-on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373824387


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File],
   /**
* Retrieves the Uuid for the directory, given its absolute path.
*/
-  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+  def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)
 
   /**
* Determine directory ID for each directory with a meta.properties.
-   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
*/
-  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
-dirs.flatMap { dir =>
+  private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+val result = mutable.HashMap[String, Uuid]()
+dirs.foreach(dir => {
   try {
-val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-metadataCheckpoint.read().map { props =>
-  val rawMetaProperties = new RawMetaProperties(props)
-  val uuid = rawMetaProperties.directoryId match {
-case Some(uuidStr) => Uuid.fromString(uuidStr)
-case None =>
-  val uuid = Uuid.randomUuid()
-  rawMetaProperties.directoryId = uuid.toString
-  metadataCheckpoint.write(rawMetaProperties.props)
-  uuid
-  }
-  dir.getAbsolutePath -> uuid
-}.toMap
+val props = PropertiesUtils.readPropertiesFile(
+  new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
+val metaProps = new MetaProperties.Builder(props).build()
+metaProps.directoryId().ifPresent(directoryId => {
+  result += (dir.getAbsolutePath -> directoryId)
+})

Review Comment:
   Yes, I agree it should have a directory ID by this point, during the course 
of normal operation. By handling the no-ID case, I was trying to avoid unit 
tests failing. Maybe we could remove this as a follow-on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -407,36 +414,41 @@ object StorageTool extends Logging {
 if (directories.isEmpty) {
   throw new TerseFailure("No log directories found in the configuration.")
 }
-
-val unformattedDirectories = directories.filter(directory => {
-  if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
-  true
-  } else if (!ignoreFormatted) {
-throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
-  "Use --ignore-formatted to ignore this directory and format the 
others.")
-  } else {
-false
-  }
-})
-if (unformattedDirectories.isEmpty) {
+val loader = new MetaPropertiesEnsemble.Loader()
+directories.foreach(loader.addLogDir(_))
+val metaPropertiesEnsemble = loader.load()
+metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(),
+  util.EnumSet.noneOf(classOf[VerificationFlag]))
+
+val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
+  val firstLogDir = copier.logDirProps().keySet().iterator().next()
+  throw new TerseFailure(s"Log directory ${firstLogDir} directory is 
already formatted. " +
+"Use --ignore-formatted to ignore this directory and format the 
others.")
+}
+if (!copier.errorLogDirs().isEmpty) {
+  val firstLogDir = copier.errorLogDirs().iterator().next()
+  throw new TerseFailure(s"I/O error trying to read log directory 
${firstLogDir}.")
+}
+if (copier.emptyLogDirs().isEmpty) {
   stream.println("All of the log directories are already formatted.")
+} else {
+  copier.emptyLogDirs().forEach(logDir => {
+val newMetaProperties = new MetaProperties.Builder(metaProperties).
+  setDirectoryId(copier.generateValidDirectoryId()).
+  build()
+copier.logDirProps().put(logDir, newMetaProperties)

Review Comment:
   It seems a bit cumbersome to put accessors on all the maps. But I'm open to 
ideas.
   
   I do wish Java had some way of flagging this map as different from the 
immutable ones. Kind of like `const` in C++, or yes, the whole menagerie of 
immtuable/mutable Scala classes. Although that comes with its own set of 
problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373820141


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -228,23 +226,26 @@ class KafkaServer(
 info(s"Cluster ID = $clusterId")
 
 /* load metadata */
-val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =
-  
BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, 
ignoreMissing = true, kraftMode = false)
-
-if (preloadedBrokerMetadataCheckpoint.version != 0) {
-  throw new RuntimeException(s"Found unexpected version in loaded 
`meta.properties`: " +
-s"$preloadedBrokerMetadataCheckpoint. Zk-based brokers only 
support version 0 " +
-"(which is implicit when the `version` field is missing).")
+val initialMetaPropsEnsemble = {
+  val loader = new MetaPropertiesEnsemble.Loader()
+  config.logDirs.foreach(loader.addLogDir(_))
+  loader.load()
 }

Review Comment:
   yes. good catch. @pprovenzano also found this bug through testing :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373816778


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogDirFailureChannel.java:
##
@@ -57,7 +56,7 @@ public boolean hasOfflineLogDir(String logDir) {
  * @param msg Error message.
  * @param e Exception instance.
  */
-public void maybeAddOfflineLogDir(String logDir, String msg, IOException 
e) {
+public void maybeAddOfflineLogDir(String logDir, String msg, Exception e) {

Review Comment:
   Fair enough. I changed it above to just catch IOE so I will change it here 
as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350


##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -135,39 +136,52 @@ object KafkaRaftServer {
* @return A tuple containing the loaded meta properties (which are 
guaranteed to
* be consistent across all log dirs) and the offline directories
*/
-  def initializeLogDirs(config: KafkaConfig): (MetaProperties, 
BootstrapMetadata, Seq[String]) = {
-val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
-val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
-  getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, 
kraftMode = true)
-
-if (offlineDirs.contains(config.metadataLogDir)) {
-  throw new KafkaException("Cannot start server since `meta.properties` 
could not be " +
-s"loaded from ${config.metadataLogDir}")
+  def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, 
BootstrapMetadata) = {
+// Load and verify the original ensemble.
+val loader = new MetaPropertiesEnsemble.Loader()
+loader.addMetadataLogDir(config.metadataLogDir)
+config.logDirs.foreach(loader.addLogDir(_))
+val initialMetaPropsEnsemble = loader.load()
+initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => {
+  throw new RuntimeException(s"No `meta.properties` found in $logDir (have 
you run `kafka-storage.sh` " +
+"to format the directory?)")
+})
+val verificationFlags = if (config.migrationEnabled) {
+  util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
+} else {
+  util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID)
 }
+initialMetaPropsEnsemble.verify(Optional.empty(), 
OptionalInt.of(config.nodeId), verificationFlags);
 
+// Check that the __cluster_metadata-0 topic does not appear outside the 
metadata directory.
 val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
-val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ 
config.metadataLogDir)
-onlineNonMetadataDirs.foreach { logDir =>
-  val metadataDir = new File(logDir, metadataPartitionDirName)
-  if (metadataDir.exists) {
-throw new KafkaException(s"Found unexpected metadata location in data 
directory `$metadataDir` " +
-  s"(the configured metadata directory is ${config.metadataLogDir}).")
+initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => {
+  if (!logDir.equals(config.metadataLogDir)) {
+val clusterMetadataTopic = new File(logDir, metadataPartitionDirName)
+if (clusterMetadataTopic.exists) {
+  throw new KafkaException(s"Found unexpected metadata location in 
data directory `$clusterMetadataTopic` " +
+s"(the configured metadata directory is 
${config.metadataLogDir}).")
+}
   }
-}
-
-val metaProperties = MetaProperties.parse(rawMetaProperties)
-if (config.nodeId != metaProperties.nodeId) {
-  throw new InconsistentNodeIdException(
-s"Configured node.id `${config.nodeId}` doesn't match stored node.id 
`${metaProperties.nodeId}' in " +
-  "meta.properties. If you moved your data, make sure your configured 
controller.id matches. " +
-  "If you intend to create a new broker, you should remove all data in 
your data directories (log.dirs).")
-}
+})
+
+// Set directory IDs on all directories. Rewrite the files if needed.

Review Comment:
   Yeah. Maybe eventually we'll also auto-upgrade from v0 -> v1 here (once not 
in migration mode any more)
   
   v0 is quite annoying since there's basically no required fields at all
   
   But one step at a time...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350


##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -135,39 +136,52 @@ object KafkaRaftServer {
* @return A tuple containing the loaded meta properties (which are 
guaranteed to
* be consistent across all log dirs) and the offline directories
*/
-  def initializeLogDirs(config: KafkaConfig): (MetaProperties, 
BootstrapMetadata, Seq[String]) = {
-val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
-val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
-  getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, 
kraftMode = true)
-
-if (offlineDirs.contains(config.metadataLogDir)) {
-  throw new KafkaException("Cannot start server since `meta.properties` 
could not be " +
-s"loaded from ${config.metadataLogDir}")
+  def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, 
BootstrapMetadata) = {
+// Load and verify the original ensemble.
+val loader = new MetaPropertiesEnsemble.Loader()
+loader.addMetadataLogDir(config.metadataLogDir)
+config.logDirs.foreach(loader.addLogDir(_))
+val initialMetaPropsEnsemble = loader.load()
+initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => {
+  throw new RuntimeException(s"No `meta.properties` found in $logDir (have 
you run `kafka-storage.sh` " +
+"to format the directory?)")
+})
+val verificationFlags = if (config.migrationEnabled) {
+  util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
+} else {
+  util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID)
 }
+initialMetaPropsEnsemble.verify(Optional.empty(), 
OptionalInt.of(config.nodeId), verificationFlags);
 
+// Check that the __cluster_metadata-0 topic does not appear outside the 
metadata directory.
 val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
-val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ 
config.metadataLogDir)
-onlineNonMetadataDirs.foreach { logDir =>
-  val metadataDir = new File(logDir, metadataPartitionDirName)
-  if (metadataDir.exists) {
-throw new KafkaException(s"Found unexpected metadata location in data 
directory `$metadataDir` " +
-  s"(the configured metadata directory is ${config.metadataLogDir}).")
+initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => {
+  if (!logDir.equals(config.metadataLogDir)) {
+val clusterMetadataTopic = new File(logDir, metadataPartitionDirName)
+if (clusterMetadataTopic.exists) {
+  throw new KafkaException(s"Found unexpected metadata location in 
data directory `$clusterMetadataTopic` " +
+s"(the configured metadata directory is 
${config.metadataLogDir}).")
+}
   }
-}
-
-val metaProperties = MetaProperties.parse(rawMetaProperties)
-if (config.nodeId != metaProperties.nodeId) {
-  throw new InconsistentNodeIdException(
-s"Configured node.id `${config.nodeId}` doesn't match stored node.id 
`${metaProperties.nodeId}' in " +
-  "meta.properties. If you moved your data, make sure your configured 
controller.id matches. " +
-  "If you intend to create a new broker, you should remove all data in 
your data directories (log.dirs).")
-}
+})
+
+// Set directory IDs on all directories. Rewrite the files if needed.

Review Comment:
   Yeah. Maybe eventually we'll upgrade from v0 -> v1 (if not in migration mode 
any more)
   
   v0 is quite annoying since there's basically no required fields at all
   
   But one step at a time...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373815484


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+/**
+ * The version of a meta.properties file.
+ */
+public enum MetaPropertiesVersion {
+V0(0),
+V1(1);
+
+private final int number;
+
+public static MetaPropertiesVersion fromNumberString(String numberString) {
+int number;
+try {
+number = Integer.parseInt(numberString.trim());
+} catch (NumberFormatException  e) {
+throw new RuntimeException("Invalid meta.properties version string 
'" +
+numberString + "'");
+}
+return fromNumber(number);
+}
+
+public static MetaPropertiesVersion fromNumber(int number) {
+switch (number) {
+case 0: return V0;
+case 1: return V1;
+default: throw new RuntimeException("Unknown meta.properties 
version number " + number);
+}
+}
+
+MetaPropertiesVersion(int number) {
+this.number = number;
+}
+
+public int number() {
+return number;
+}
+
+public String numberString() {
+return "" + number;
+}
+
+public boolean hasBrokerId() {
+return this == V0;
+}
+
+public boolean alwaysHasId() {

Review Comment:
   yes, let's rename it to `alwaysHasNodeId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373813203


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -227,8 +231,12 @@ public KafkaClusterTestKit build() throws Exception {
 setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
 BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
 fromVersion(nodes.bootstrapMetadataVersion(), 
"testkit");
+MetaPropertiesEnsemble metaPropertiesEnsemble = new 
MetaPropertiesEnsemble.Loader().
+setLoadMissingBehavior(LoadMissingBehavior.EXCEPTION).
+addMetadataLogDir(node.metadataDirectory()).

Review Comment:
   Yes, this was left over from some earlier code, sorry. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373812595


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:
##
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+
+/**
+ * A collection of meta.properties information for Kafka log directories.
+ *
+ * Directories are categorized into empty, error, and normal. Each directory 
must appear in only
+ * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps.
+ *
+ * This class is immutable. Modified copies can be made with the Copier class.
+ */
+public class MetaPropertiesEnsemble {
+/**
+ * The log4j object for this class.
+ */
+private static final Logger LOG = 
LoggerFactory.getLogger(MetaPropertiesEnsemble.class);
+
+/**
+ * A completely empty MetaPropertiesEnsemble object.
+ */
+public static final MetaPropertiesEnsemble EMPTY = new 
MetaPropertiesEnsemble(Collections.emptySet(),
+Collections.emptySet(),
+Collections.emptyMap(),
+Optional.empty());
+
+/**
+ * The name of the meta.properties file within each log directory.
+ */
+public static final String META_PROPERTIES_NAME = "meta.properties";
+
+/**
+ * The set of log dirs that were empty.
+ */
+private final Set emptyLogDirs;
+
+/**
+ * The set of log dirs that had errors.
+ */
+private final Set errorLogDirs;
+
+/**
+ * A map from log directories to the meta.properties information inside 
each one.
+ */
+private final Map logDirProps;
+
+/**
+ * The metadata log directory, or the empty string if there is none.
+ */
+private final Optional metadataLogDir;
+
+/**
+ * Utility class for loading a MetaPropertiesEnsemble from the disk.
+ */
+public static class Loader {
+private final TreeSet logDirs = new TreeSet<>();
+private Optional metadataLogDir = Optional.empty();
+
+public Loader addLogDirs(Collection logDirs) {
+for (String logDir : logDirs) {
+this.logDirs.add(logDir);
+}
+return this;
+}
+
+public Loader addLogDir(String logDir) {
+this.logDirs.add(logDir);
+return this;
+}
+
+public Loader addMetadataLogDir(String metadataLogDir) {
+if (this.metadataLogDir.isPresent()) {
+throw new RuntimeException("Cannot specify more than one 
metadata log directory. " +
+"Already specified " + this.metadataLogDir.get());
+}
+this.metadataLogDir = Optional.of(metadataLogDir);
+logDirs.add(metadataLogDir);
+return this;
+}
+
+public MetaPropertiesEnsemble load() throws IOException  {
+if (logDirs.isEmpty()) {
+throw new RuntimeException("You must specify at least one log 
directory.");
+}
+Set emptyLogDirs = new HashSet<>();
+Set errorLogDirs = new HashSet<>();
+Map logDirProps = new HashMap<>();
+for (String logDir : logDirs) {
+String metaPropsFile = new File(logDir, 
META_PROPERTIES_NAME).getAbsolutePath();
+try {
+Properties props = 
PropertiesUtils.readPropertiesFile(metaPropsFile);
+   

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373810872


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   In some filesystems, if you don't fsync the parent directory, the file can 
be lost if the machine loses power. This is again something ext3 was famous 
for. Letting you sync files to disk, but then making them unreachable because 
you didn't sync the directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373808533


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+
+/**
+ * An immutable class which contains the per-log-directory information stored 
in an individual
+ * meta.properties file.
+ */
+public final class MetaProperties {

Review Comment:
   Hmm, good question. I guess I think it does belong here.
   
   The `:metadata` module in gradle is about more than just the controller. 
it's about how kafka handles metadata in general and has things like node 
registrations, metadata publishing and loading, etc. I think this is in keeping 
with that.
   
   Also, independently of all that, we should all be striving to shrink the 
`:core` module, not add more stuff there. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


divijvaidya commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373545620


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   hmm...that makes sense, but in that case, do we need to flush the directory 
again after renaming? (In atomicMoveWithFallback() the third parameter defaults 
to needFlushParentDir=true). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373542993


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {

Review Comment:
   Hmm. Well, the purpose of `MetaProperties` is that we know what types we 
want in the meta.properties file and we can check for those specifically.
   
   If we just want a generic key/value file where we don't know ahead of time 
what is in it, `java.util.Properties` does that just fine. It has a bit of 
old-school JDK 1.0 messiness, but on the other hand it's instantly familiar to 
Java programmers, and that has some value. So I doubt replacing or wrapping 
would repay the time investment needed, to be honest. 
   
   I don't see the connection with fsync since we sometimes want to fsync 
properties files, but other times not. So it's ultimately up to the programmer 
to do the right thing, as always.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373542993


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {

Review Comment:
   Hmm. Well, the purpose of `MetaProperties` is that we know what types we 
want in the meta.properties file and we can check for those specifically.
   
   If we just want a generic key/value file where we don't know ahead of time 
what is in it, `java.util .Properties` does that just fine. It has a bit of 
old-school JDK 1.0 messiness, but on the other hand it's instantly familiar to 
Java programmers, and that has some value. So I doubt replacing or wrapping 
would repay the time investment needed, to be honest. 
   
   I don't see the connection with fsync since we sometimes want to fsync 
properties files, but other times not. So it's ultimately up to the programmer 
to do the right thing, as always.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373533083


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   This is what the previous code in `BrokerMetadataCheckpoint.scala` did. The 
reason for doing it this way is for extra safety, as @pprovenzano commented. 
Some filesystems (I think ext3 was a notable offender) tended to lose data when 
fsync was omitted, even in the case where other operations like rename were 
performed later.
   
   Since we only very rarely write the `meta.properties` file, it's surely not 
worth optimizing this by avoiding the fsync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373533689


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();
+}
+File targetFile = new File(path);
+try {
+Utils.atomicMoveWithFallback(tempFile.toPath(), 
targetFile.toPath());

Review Comment:
   same as above, this is needed for safetly. I will add a boolean in case 
someone wants to use this function in a context where safety doesn't matter as 
much (like unit tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on PR #14628:
URL: https://github.com/apache/kafka/pull/14628#issuecomment-1781560050

   > This fails for the case where there are no log directories to start and we 
are creating the node and directories for the first time.
   
   Thanks for testing. Yes, this was missing some code to write out 
`meta.properties` files to empty directories in the ZK case. Should be fixed in 
the next revision.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-25 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1372498997


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   If the process crashes  and we lose the data in the page cache (from say a 
power failure) after the rename but before the data of the file is on disk, 
then on some filesystems this would result in an empty target file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-25 Thread via GitHub


soarez commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1371749943


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+
+/**
+ * An immutable class which contains the per-log-directory information stored 
in an individual
+ * meta.properties file.
+ */
+public final class MetaProperties {
+/**
+ * The property that sets the version number.
+ */
+static final String VERSION_PROP = "version";
+
+/**
+ * The property that specifies the cluster id.
+ */
+static final String CLUSTER_ID_PROP = "cluster.id";
+
+/**
+ * The property that specifies the broker id. Only in V0.
+ */
+static final String BROKER_ID_PROP = "broker.id";
+
+/**
+ * The property that specifies the node id. Replaces broker.id in V1.
+ */
+static final String NODE_ID_PROP = "node.id";
+
+/**
+ * The property that specifies the directory id.
+ */
+static final String DIRECTORY_ID_PROP = "directory.id";
+
+/**
+ * The version of the MetaProperties file.
+ */
+private final MetaPropertiesVersion version;
+
+/**
+ * The cluster ID, which may be Optional.empty in V0.
+ */
+private final Optional clusterId;
+
+/**
+ * The node ID, which may be OptionalInt.empty in V0.
+ */
+private final OptionalInt nodeId;
+
+/**
+ * The JBOD directory ID, or Optional.empty if none is specified.

Review Comment:
   ```suggestion
* The directory ID, or Optional.empty if none is specified.
   ```



##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:
##
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+
+/**
+ * A collection of meta.properties information for Kafka log directories.
+ *
+ * Directories are categorized into empty, error, and normal. Each directory 
must appear in only
+ * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps.
+ *
+ * This class is immutable. Modified copies can be made with the Copier class.
+ */
+public class MetaPropertiesEnsemble {
+/**
+ * The log4j object for this class.
+ */
+private static final Logger LOG = 
LoggerFactory.getLogger(MetaPropertiesEnsemble.class);
+
+/**
+ * A completely empty 

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-25 Thread via GitHub


divijvaidya commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1371459493


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   for my better understanding, could you please explain why fsync() is 
necessary for the temp file? I am curious because it is going to be renamed in 
the next step and if there is a process crash losing data in page cache, we 
anyways won't want to re-use this temp file.



##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {

Review Comment:
   Can we extract this as a "type" of file (perhaps called PropertiesFile) that 
Kafka creates similar to how we have a type `CheckpointFile` or `Snapshots`? 
This new type of file could be re-used in future for other use case. It  would 
also help unify places where we are writing to file system and prevent 
accidental fsync() which we are trying to solve in 
https://github.com/apache/kafka/pull/14242 



##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, 

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-24 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1371083468


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -227,8 +231,12 @@ public KafkaClusterTestKit build() throws Exception {
 setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
 BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
 fromVersion(nodes.bootstrapMetadataVersion(), 
"testkit");
+MetaPropertiesEnsemble metaPropertiesEnsemble = new 
MetaPropertiesEnsemble.Loader().
+setLoadMissingBehavior(LoadMissingBehavior.EXCEPTION).
+addMetadataLogDir(node.metadataDirectory()).

Review Comment:
   LoadMissingBehavior.EXCEPTION is not defined anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org