Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2024-04-10 Thread via GitHub


jolshan commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1560454922


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+public class PartitionMetadataFile {
+private static final String PARTITION_METADATA_FILE_NAME = 
"partition.metadata";
+static final int CURRENT_VERSION = 0;
+
+public static File newFile(File dir) {
+return new File(dir, PARTITION_METADATA_FILE_NAME);
+}
+
+private final File file;
+private final LogDirFailureChannel logDirFailureChannel;
+
+private final Object lock = new Object();
+private volatile Optional dirtyTopicIdOpt = Optional.empty();
+
+public PartitionMetadataFile(
+final File file,
+final LogDirFailureChannel logDirFailureChannel
+) {
+this.file = file;
+this.logDirFailureChannel = logDirFailureChannel;
+}
+
+/**
+ * Records the topic ID that will be flushed to disk.
+ */
+public void record(Uuid topicId) {
+// Topic IDs should not differ, but we defensively check here to fail 
earlier in the case that the IDs somehow differ.
+dirtyTopicIdOpt.ifPresent(dirtyTopicId -> {
+if (dirtyTopicId != topicId) {

Review Comment:
   I don't think this should be a reference comparison. Perhaps a miss in the 
scala -> java conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-11-01 Thread via GitHub


junrao merged PR #14607:
URL: https://github.com/apache/kafka/pull/14607


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-11-01 Thread via GitHub


junrao commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1789288736

   Thanks for triaging the transient test failures, @alok123t. Will merge the 
PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-11-01 Thread via GitHub


alok123t commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1788463950

   @ijuma @junrao 
   the latest build succeeds for JDK 8/Scala 2.12. The previous failures should 
be unrelated to this PR
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14607/9/pipeline/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-11-01 Thread via GitHub


alok123t commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1788460578

   @junrao created following JIRA
   https://issues.apache.org/jira/browse/KAFKA-15770
   For rest of the failed tests, JIRA already exists


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-28 Thread via GitHub


ijuma commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1783833262

   I think this doesn't compile with Scala 2.12 - that needs to be fixed before 
we can proceed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-26 Thread via GitHub


alok123t commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1781801313

   @junrao I think the test failures should be unrelated to the PR from a quick 
look - not sure if these are known flaky tests, I will wait for another run 
from the latest commit 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-26 Thread via GitHub


alok123t commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1373738648


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+public class PartitionMetadata {
+
+private final int version;
+private final Uuid topicId;
+
+public PartitionMetadata(int version, Uuid topicId) {
+this.version = version;
+this.topicId = topicId;
+}
+
+public int version() {
+return version;
+}
+
+public Uuid topicId() {
+return topicId;
+}
+
+public String toText() {

Review Comment:
   updated to encode in 
https://github.com/apache/kafka/pull/14607/commits/f2d05e64314401142d1386f031b45b42425d5a93
   
   It's better to use the `Formatter` interface in `LeaderEpochCheckpointFile`, 
we can do in a follow up PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-26 Thread via GitHub


junrao commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1373572075


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+public class PartitionMetadata {
+
+private final int version;
+private final Uuid topicId;
+
+public PartitionMetadata(int version, Uuid topicId) {
+this.version = version;
+this.topicId = topicId;
+}
+
+public int version() {
+return version;
+}
+
+public Uuid topicId() {
+return topicId;
+}
+
+public String toText() {

Review Comment:
   Perhaps `encode` will be a better name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-26 Thread via GitHub


alok123t commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1372811735


##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+private final String location;
+private final BufferedReader reader;
+
+public PartitionMetadataReadBuffer(
+String location,
+BufferedReader reader
+) {
+this.location = location;
+this.reader = reader;
+}
+
+PartitionMetadata read() throws IOException {
+String line = null;
+Uuid metadataTopicId;
+
+try {
+line = reader.readLine();
+String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+if (versionArr.length == 2) {
+int version = Integer.parseInt(versionArr[1]);
+if (version == PartitionMetadataFile.CURRENT_VERSION) {

Review Comment:
   Makes sense, changed to `>=` instead and added a comment in 
https://github.com/apache/kafka/pull/14607/commits/dddade29b7a73640553aafbb6eea4a962f68ba77



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1372393891


##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+private final String location;
+private final BufferedReader reader;
+
+public PartitionMetadataReadBuffer(
+String location,
+BufferedReader reader
+) {
+this.location = location;
+this.reader = reader;
+}
+
+PartitionMetadata read() throws IOException {
+String line = null;
+Uuid metadataTopicId;
+
+try {
+line = reader.readLine();
+String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+if (versionArr.length == 2) {
+int version = Integer.parseInt(versionArr[1]);
+if (version == PartitionMetadataFile.CURRENT_VERSION) {

Review Comment:
   This is an existing issue. In the future, we may add a new field and bump up 
the version, to make it possible to downgrade, it's probably better to relax 
this check a bit.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadata.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;

Review Comment:
   It seems that this should be in the 
`org.apache.kafka.storage.internals.checkpoint` package?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+private final String location;
+private final BufferedReader reader;
+
+public PartitionMetadataReadBuffer(
+String location,
+BufferedReader reader
+) {
+this.location = location;
+this.reader = reader;
+}
+
+PartitionMetadata read() throws IOException {
+String line = null;
+Uuid