This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 07df26778b Change the checksum algorithm SAI-related files use from CRC32 to CRC32C 07df26778b is described below commit 07df26778b01a00c1f5770c8cf133ce4c2829533 Author: Maxim Muzafarov <maxmu...@gmail.com> AuthorDate: Fri Oct 20 11:01:54 2023 +0200 Change the checksum algorithm SAI-related files use from CRC32 to CRC32C patch by Maxim Muzafarov; reviewed by Caleb Rackliffe and Zhao Yang for CASSANDRA-18836 --- CHANGES.txt | 1 + .../sai/disk/io/BufferedChecksumIndexInput.java | 86 ++++++++++++++++++++++ .../index/sai/disk/io/IndexFileUtils.java | 19 ++++- .../index/sai/disk/v1/MetadataSource.java | 5 +- .../cassandra/index/sai/disk/v1/SAICodecUtils.java | 42 ++++++++--- .../index/sai/disk/v1/SAICodecUtilsTest.java | 22 +++--- 6 files changed, 148 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 89d1163f83..547a1bb11f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-alpha2 + * Change the checksum algorithm SAI-related files use from CRC32 to CRC32C (CASSANDRA-18836) * Correctly remove Index.Group from IndexRegistry (CASSANDRA-18905) * Fix vector type to support DDM's mask_default function (CASSANDRA-18889) * Remove unnecessary reporter-config3 dependency (CASSANDRA-18907) diff --git a/src/java/org/apache/cassandra/index/sai/disk/io/BufferedChecksumIndexInput.java b/src/java/org/apache/cassandra/index/sai/disk/io/BufferedChecksumIndexInput.java new file mode 100644 index 0000000000..333868466a --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/io/BufferedChecksumIndexInput.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.disk.io; + +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; +import java.util.zip.Checksum; + +/** + * This implementation of {@link ChecksumIndexInput} is based on {@link org.apache.lucene.store.BufferedChecksumIndexInput} + * but uses custom checksum algorithm instead of the hardcoded {@code CRC32} in {@code BufferedChecksumIndexInput}. + * + * @see org.apache.cassandra.index.sai.disk.io.IndexFileUtils.ChecksummingWriter + */ +class BufferedChecksumIndexInput extends ChecksumIndexInput +{ + final IndexInput delegate; + final Checksum digest; + + public BufferedChecksumIndexInput(IndexInput delegate, Checksum digest) + { + super("BufferedChecksumIndexInput(" + delegate + ')'); + this.delegate = delegate; + this.digest = digest; + } + + public byte readByte() throws IOException + { + byte b = this.delegate.readByte(); + this.digest.update(b); + return b; + } + + public void readBytes(byte[] b, int offset, int len) throws IOException + { + this.delegate.readBytes(b, offset, len); + this.digest.update(b, offset, len); + } + + public long getChecksum() + { + return this.digest.getValue(); + } + + public void close() throws IOException + { + this.delegate.close(); + } + public long getFilePointer() + { + return this.delegate.getFilePointer(); + } + + public long length() + { + return this.delegate.length(); + } + + public IndexInput clone() + { + throw new UnsupportedOperationException(); + } + + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException + { + throw new UnsupportedOperationException(); + } +} diff --git a/src/java/org/apache/cassandra/index/sai/disk/io/IndexFileUtils.java b/src/java/org/apache/cassandra/index/sai/disk/io/IndexFileUtils.java index b0145d24fd..4d280ea7a1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/io/IndexFileUtils.java +++ b/src/java/org/apache/cassandra/index/sai/disk/io/IndexFileUtils.java @@ -20,7 +20,9 @@ package org.apache.cassandra.index.sai.disk.io; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.zip.CRC32; +import java.util.function.Supplier; +import java.util.zip.CRC32C; +import java.util.zip.Checksum; import com.google.common.annotations.VisibleForTesting; @@ -31,6 +33,7 @@ import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.IndexInput; public class IndexFileUtils @@ -44,6 +47,7 @@ public class IndexFileUtils .build(); public static final IndexFileUtils instance = new IndexFileUtils(DEFAULT_WRITER_OPTION); + private static final Supplier<Checksum> CHECKSUM_FACTORY = CRC32C::new; private final SequentialWriterOption writerOption; @@ -75,9 +79,20 @@ public class IndexFileUtils return IndexInputReader.create(randomReader, fileHandle::close); } + public static ChecksumIndexInput getBufferedChecksumIndexInput(IndexInput indexInput) + { + return new BufferedChecksumIndexInput(indexInput, CHECKSUM_FACTORY.get()); + } + + /** + * The SequentialWriter that calculates checksum of the data written to the file. This writer extends + * {@link SequentialWriter} to add the checksumming functionality and typically is used together + * with {@link IndexOutputWriter}. This, in turn, is used in conjunction with {@link BufferedChecksumIndexInput} + * to verify the checksum of the data read from the file, so they must share the same checksum algorithm. + */ static class ChecksummingWriter extends SequentialWriter { - private final CRC32 checksum = new CRC32(); + private final Checksum checksum = CHECKSUM_FACTORY.get(); ChecksummingWriter(File file, SequentialWriterOption writerOption) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataSource.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataSource.java index b552b09574..766e52d575 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataSource.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataSource.java @@ -25,8 +25,9 @@ import javax.annotation.concurrent.NotThreadSafe; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; -import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.cassandra.index.sai.disk.io.IndexFileUtils; import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.BytesRef; @@ -55,7 +56,7 @@ public class MetadataSource { Map<String, BytesRef> components = new HashMap<>(); - try (BufferedChecksumIndexInput input = new BufferedChecksumIndexInput(indexInput)) + try (ChecksumIndexInput input = IndexFileUtils.getBufferedChecksumIndexInput(indexInput)) { SAICodecUtils.checkHeader(input); final int num = input.readInt(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SAICodecUtils.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SAICodecUtils.java index f664e6d704..58be96a531 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SAICodecUtils.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SAICodecUtils.java @@ -21,7 +21,7 @@ package org.apache.cassandra.index.sai.disk.v1; import java.io.IOException; import org.apache.cassandra.index.sai.disk.format.Version; -import org.apache.lucene.codecs.CodecUtil; +import org.apache.cassandra.index.sai.disk.io.IndexFileUtils; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.DataInput; @@ -30,6 +30,7 @@ import org.apache.lucene.store.IndexOutput; import static org.apache.lucene.codecs.CodecUtil.CODEC_MAGIC; import static org.apache.lucene.codecs.CodecUtil.FOOTER_MAGIC; +import static org.apache.lucene.codecs.CodecUtil.footerLength; import static org.apache.lucene.codecs.CodecUtil.readBEInt; import static org.apache.lucene.codecs.CodecUtil.readBELong; import static org.apache.lucene.codecs.CodecUtil.writeBEInt; @@ -53,7 +54,7 @@ public class SAICodecUtils { writeBEInt(out, FOOTER_MAGIC); writeBEInt(out, 0); - writeCRC(out); + writeChecksum(out); } public static void checkHeader(DataInput in) throws IOException @@ -74,7 +75,7 @@ public class SAICodecUtils { validateFooter(in, false); long actualChecksum = in.getChecksum(); - long expectedChecksum = readCRC(in); + long expectedChecksum = readChecksum(in); if (expectedChecksum != actualChecksum) { throw new CorruptIndexException("checksum failed (hardware problem?) : expected=" + Long.toHexString(expectedChecksum) + @@ -99,9 +100,26 @@ public class SAICodecUtils input.seek(current); } + /** + * See {@link org.apache.lucene.codecs.CodecUtil#checksumEntireFile(org.apache.lucene.store.IndexInput)}. + * @param input IndexInput to validate. + * @throws IOException if a corruption is detected. + */ public static void validateChecksum(IndexInput input) throws IOException { - CodecUtil.checksumEntireFile(input); + IndexInput clone = input.clone(); + clone.seek(0L); + ChecksumIndexInput in = IndexFileUtils.getBufferedChecksumIndexInput(clone); + + assert in.getFilePointer() == 0L : in.getFilePointer() + " bytes already read from this input!"; + + if (in.length() < (long) footerLength()) + throw new CorruptIndexException("misplaced codec footer (file truncated?): length=" + in.length() + " but footerLength==" + footerLength(), input); + else + { + in.seek(in.length() - (long) footerLength()); + checkFooter(in); + } } // Copied from Lucene PackedInts as they are not public @@ -181,7 +199,7 @@ public class SAICodecUtils { long position = in.getFilePointer(); long fileLength = in.length(); - long footerLength = CodecUtil.footerLength(); + long footerLength = footerLength(); long footerPosition = fileLength - footerLength; if (footerPosition < 0) @@ -204,7 +222,7 @@ public class SAICodecUtils private static void validateFooter(IndexInput in, boolean segmented) throws IOException { long remaining = in.length() - in.getFilePointer(); - long expected = CodecUtil.footerLength(); + long expected = footerLength(); if (!segmented) { @@ -236,31 +254,31 @@ public class SAICodecUtils // Copied from Lucene CodecUtil as they are not public /** - * Writes CRC32 value as a 64-bit long to the output. + * Writes checksum value as a 64-bit long to the output. * @throws IllegalStateException if CRC is formatted incorrectly (wrong bits set) * @throws IOException if an i/o error occurs */ - private static void writeCRC(IndexOutput output) throws IOException + private static void writeChecksum(IndexOutput output) throws IOException { long value = output.getChecksum(); if ((value & 0xFFFFFFFF00000000L) != 0) { - throw new IllegalStateException("Illegal CRC-32 checksum: " + value + " (resource=" + output + ')'); + throw new IllegalStateException("Illegal checksum: " + value + " (resource=" + output + ')'); } writeBELong(output, value); } /** - * Reads CRC32 value as a 64-bit long from the input. + * Reads checksum value as a 64-bit long from the input. * @throws CorruptIndexException if CRC is formatted incorrectly (wrong bits set) * @throws IOException if an i/o error occurs */ - private static long readCRC(IndexInput input) throws IOException + private static long readChecksum(IndexInput input) throws IOException { long value = readBELong(input); if ((value & 0xFFFFFFFF00000000L) != 0) { - throw new CorruptIndexException("Illegal CRC-32 checksum: " + value, input); + throw new CorruptIndexException("Illegal checksum: " + value, input); } return value; } diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/SAICodecUtilsTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/SAICodecUtilsTest.java index 02c945a7f3..127bfa07ea 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SAICodecUtilsTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SAICodecUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.index.sai.disk.v1; +import org.apache.lucene.store.ChecksumIndexInput; import org.junit.Before; import org.junit.Test; @@ -26,7 +27,6 @@ import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter; import org.apache.cassandra.index.sai.utils.SAIRandomizedTester; import org.apache.cassandra.io.util.File; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.store.BufferedChecksumIndexInput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.mockito.Mockito; @@ -109,7 +109,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) @@ -130,7 +130,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) @@ -155,7 +155,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) @@ -182,7 +182,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) @@ -209,7 +209,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) @@ -236,7 +236,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) @@ -263,14 +263,14 @@ public class SAICodecUtilsTest extends SAIRandomizedTester } try (IndexInput input = IndexFileUtils.instance.openBlockingInput(file); - BufferedChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(input)) + ChecksumIndexInput checksumIndexInput = IndexFileUtils.getBufferedChecksumIndexInput(input)) { SAICodecUtils.checkHeader(checksumIndexInput); for (int value = 0; value < numBytes; value++) checksumIndexInput.readByte(); assertThatThrownBy(() -> SAICodecUtils.checkFooter(checksumIndexInput)) .isInstanceOf(CorruptIndexException.class) - .hasMessageContaining("Illegal CRC-32 checksum: -4294967296 "); + .hasMessageContaining("Illegal checksum: -4294967296 "); } } @@ -338,7 +338,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester input.readByte(); assertThatThrownBy(() -> SAICodecUtils.validateChecksum(input)) .isInstanceOf(CorruptIndexException.class) - .hasMessageContaining("Illegal CRC-32 checksum: -4294967296 "); + .hasMessageContaining("Illegal checksum: -4294967296 "); } } @@ -350,7 +350,7 @@ public class SAICodecUtilsTest extends SAIRandomizedTester when(indexOutput.getChecksum()).thenReturn(0xFFFFFFFF00000000L); assertThatThrownBy(() -> SAICodecUtils.writeFooter(indexOutput)) .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Illegal CRC-32 checksum: -4294967296 "); + .hasMessageContaining("Illegal checksum: -4294967296 "); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org