nitsanw commented on code in PR #4402: URL: https://github.com/apache/cassandra/pull/4402#discussion_r2435823706
########## src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java: ########## @@ -0,0 +1,743 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.io.IOException; + +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.io.util.ResizableByteBuffer; +import net.nicoulaj.compilecommand.annotations.Inline; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.Columns; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.DeserializationHelper; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.UnfilteredSerializer; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.tools.Util; +import org.apache.cassandra.utils.concurrent.Ref; + +import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*; + +public class SSTableCursorReader implements AutoCloseable +{ + public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new ColumnMetadata[0]; + private final Ref<SSTableReader> ssTableReaderRef; + + public interface State + { + /** start of file, after partition end but before EOF */ + int PARTITION_START = 1; + int STATIC_ROW_START = 1 << 1; + int ROW_START = 1 << 2; + /** common to row/static row cells */ + int CELL_HEADER_START = 1 << 3; + int CELL_VALUE_START = 1 << 4; + int CELL_END = 1 << 5; + int TOMBSTONE_START = 1 << 6; + int AFTER_TOMBSTONE_HEADER = 1 << 7; + /** common to rows/tombstones. Call continue(); for next element, or maybe partition end */ + int ELEMENT_END = 1 << 8; + /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */ + int PARTITION_END = 1 << 9; + /** EOF */ + int DONE = 1 << 10; + int SEEK = 1 << 11; + static boolean isState(int state, int mask) { + return (state & mask) != 0; + } + } + + public class CellCursor { + public ReusableLivenessInfo rowLiveness; + public Columns columns; + + public int columnsSize; + public int columnsIndex; + public int cellFlags; + public final ReusableLivenessInfo cellLiveness = new ReusableLivenessInfo(); + public CellPath cellPath; + public AbstractType<?> cellType; + public ColumnMetadata cellColumn; + private ColumnMetadata[] columnsArray; + private AbstractType<?>[] cellTypeArray; + + void init (Columns columns, ReusableLivenessInfo rowLiveness) + { + if (this.columns != columns) + { + // This will be a problem with changing columns + this.columns = columns; + columnsArray = columns.toArray(COLUMN_METADATA_TYPE); + cellTypeArray = new AbstractType<?>[columnsArray.length]; + for (int i = 0; i < columnsArray.length; i++) + { + ColumnMetadata cellColumn = columnsArray[i]; + cellTypeArray[i] = serializationHeader.getType(cellColumn); + } + // HOTSPOT: size is suprisingly expensive + columnsSize = columns.size(); + } + this.rowLiveness = rowLiveness; + columnsIndex = 0; + cellFlags = 0; + cellPath = null; + cellType = null; + } + + public boolean hasNext() + { + return columnsIndex < columnsSize; + } + + /** + * For Cell deserialization see {@link Cell.Serializer#deserialize} + * + * @return true if has value, false otherwise + */ + boolean readCellHeader() throws IOException + { + if (!(columnsIndex < columnsSize)) throw new IllegalStateException(); + + // HOTSPOT: suprisingly expensive + int currIndex = columnsIndex++; + cellColumn = columnsArray[currIndex]; + cellType = cellTypeArray[currIndex]; + cellFlags = dataReader.readUnsignedByte(); + // TODO: specialize common case where flags == HAS_VALUE | USE_ROW_TS? + boolean hasValue = Cell.Serializer.hasValue(cellFlags); + boolean isDeleted = Cell.Serializer.isDeleted(cellFlags); + boolean isExpiring = Cell.Serializer.isExpiring(cellFlags); + boolean useRowTimestamp = Cell.Serializer.useRowTimestamp(cellFlags); + boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags); + + long timestamp = useRowTimestamp ? rowLiveness.timestamp() : serializationHeader.readTimestamp(dataReader); + + long localDeletionTime = useRowTTL + ? rowLiveness.localExpirationTime() + : (isDeleted || isExpiring ? serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME); + + int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? serializationHeader.readTTL(dataReader) : Cell.NO_TTL); + localDeletionTime = Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper); + + cellLiveness.reset(timestamp, ttl, localDeletionTime); + cellPath = cellColumn.isComplex() + ? cellColumn.cellPathSerializer().deserialize(dataReader) + : null; + return hasValue; + } + } + + int state = PARTITION_START; + + private final Version version; + private final TableMetadata metadata; + private final ImmutableList<ColumnMetadata> clusteringColumns; + public final AbstractType<?>[] clusteringColumnTypes; + private final StatsMetadata statsMetadata; + private final DeserializationHelper deserializationHelper; + private final EncodingStats encodingStats; + private final SerializationHeader serializationHeader; + + // need to be closed + public final SSTableReader ssTableReader; + private final RandomAccessReader dataReader; + private final DeletionTime.Serializer deletionTimeSerializer; + // in serialization order (maybe use inheritance to clamp them together?) + public long partitionStart = 0; + + // SHARED STATIC_ROW/ROW/TOMB + public int basicElementFlags = 0; + public int extendedFlags = 0; + + private final CellCursor staticRowCellCursor = new CellCursor(); + private final CellCursor rowCellCursor = new CellCursor(); + public CellCursor cellCursor; + + public SSTableCursorReader(Descriptor desc) throws IOException + { + metadata = Util.metadataFromSSTable(desc); + ssTableReader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata)); + ssTableReaderRef = ssTableReader.ref(); + version = desc.version; + deletionTimeSerializer = DeletionTime.getSerializer(version); + clusteringColumns = metadata.clusteringColumns(); + int clusteringColumnCount = clusteringColumns.size(); + clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount]; + for (int i = 0; i < clusteringColumnTypes.length; i++) + { + clusteringColumnTypes[i] = clusteringColumns.get(i).type; + } + statsMetadata = ssTableReader.getSSTableMetadata(); + encodingStats = ssTableReader.stats(); + deserializationHelper = new DeserializationHelper(metadata, desc.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, null); + serializationHeader = ssTableReader.header; + + dataReader = ssTableReader.openDataReader(); + } + + public SSTableCursorReader(SSTableReader reader) + { + metadata = reader.metadata(); + ssTableReader = reader; + ssTableReaderRef = null; + version = reader.descriptor.version; + deletionTimeSerializer = DeletionTime.getSerializer(version); + clusteringColumns = metadata.clusteringColumns(); + int clusteringColumnCount = clusteringColumns.size(); + clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount]; + for (int i = 0; i < clusteringColumnTypes.length; i++) + { + clusteringColumnTypes[i] = clusteringColumns.get(i).type; + } + statsMetadata = reader.getSSTableMetadata(); + encodingStats = reader.stats(); + deserializationHelper = new DeserializationHelper(metadata, version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, null); + serializationHeader = reader.header; + + dataReader = reader.openDataReader(); + } + + @Override + public void close() + { + dataReader.close(); + if (ssTableReaderRef != null) + ssTableReaderRef.close(); + } + + private void resetOnPartitionStart() + { + partitionStart = dataReader.getPosition(); + basicElementFlags = 0; + extendedFlags = 0; + } + + public int seekPartition(long position) throws IOException + { + state = SEEK; + if (position == 0) + { + dataReader.seek(position); + state = PARTITION_START; + } + else { + // verify partition start is after a partition end marker + dataReader.seek(position - 1); + if (checkNextFlags() == PARTITION_END) + state = PARTITION_START; + else + throw new IllegalArgumentException("Seeking to a partition at: " + position + " did not result in a valid state"); + } + resetOnPartitionStart(); + return state; + } + + public int seekPartitionElement(long position) throws IOException + { + // partition elements have flags + dataReader.seek(position); + int state = checkNextFlags(); + if (!isState(state , ROW_START | TOMBSTONE_START | STATIC_ROW_START | DONE)) throw new IllegalStateException(); + return state; + } + + // struct partition { + // struct partition_header header + // optional<struct row> row + // struct unfiltered unfiltereds[]; + //}; + public int readPartitionHeader(PartitionDescriptor header) throws IOException + { + if (state != PARTITION_START) throw new IllegalStateException(); + resetOnPartitionStart(); + header.load(dataReader, deletionTimeSerializer); + return checkNextFlags(); + } + + // struct static_row { + // byte flags; // pre-loaded + // byte extended_flags; // pre-loaded + // varint row_body_size; + // varint prev_unfiltered_size; // for backward traversing, ignored + // optional<struct liveness_info> liveness_info; + // optional<struct delta_deletion_time> deletion_time; + // *** We read the columns in a separate method *** + // optional<varint[]> missing_columns; + // cell[] cells; // potentially only some + //}; + public int readStaticRowHeader(ElementDescriptor elementDescriptor) throws IOException + { + if (state != STATIC_ROW_START) throw new IllegalStateException(); + elementDescriptor.loadStaticRow(dataReader, serializationHeader, deserializationHelper, basicElementFlags, extendedFlags); + + staticRowCellCursor.init(elementDescriptor.rowColumns(), elementDescriptor.livenessInfo()); + cellCursor = staticRowCellCursor; + if (!staticRowCellCursor.hasNext()) + { + return checkNextFlags(); + } + else + { + return state = State.CELL_HEADER_START; + } + } + + public int copyCellValue(DataOutputPlus writer, byte[] buffer) throws IOException + { + if (state != State.CELL_VALUE_START) throw new IllegalStateException(); + if (cellCursor.cellType == null) throw new IllegalStateException(); + int length = cellCursor.cellType.valueLengthIfFixed(); + copyCellContents(writer, buffer, length); + return !cellCursor.hasNext() ? checkNextFlags() : (state = State.CELL_END); + } + + // TODO: move to cell cursor? maybe avoid copy through buffer? + private void copyCellContents(DataOutputPlus writer, byte[] transferBuffer, int length) throws IOException + { + if (length >= 0) + { + dataReader.readFully(transferBuffer, 0, length); + writer.write(transferBuffer, 0, length); + } + else + { + length = dataReader.readUnsignedVInt32(); + if (length < 0) + throw new IOException("Corrupt (negative) value length encountered"); + writer.writeUnsignedVInt32(length); + int remaining = length; + while (remaining > 0) + { + int readLength = Math.min(remaining, transferBuffer.length); + dataReader.readFully(transferBuffer, 0, readLength); + writer.write(transferBuffer, 0, readLength); + remaining -= readLength; + } + } + } + + // struct row { + // byte flags; + // optional<struct clustering_block[]> clustering_blocks; + // varint row_body_size; + // varint prev_unfiltered_size; // for backward traversing, ignored + // optional<struct liveness_info> liveness_info; + // optional<struct delta_deletion_time> deletion_time; + // *** We read the columns in a separate step *** + // optional<varint[]> missing_columns; + // cell[] cells; // potentially only some + //}; + public int readRowHeader(ElementDescriptor elementDescriptor) throws IOException + { + if (state != State.ROW_START) throw new IllegalStateException(); + if (!UnfilteredSerializer.isRow(basicElementFlags)) throw new IllegalStateException(); + elementDescriptor.loadRow(dataReader, serializationHeader, deserializationHelper, clusteringColumnTypes, basicElementFlags); + + rowCellCursor.init(elementDescriptor.rowColumns(), elementDescriptor.livenessInfo()); + cellCursor = rowCellCursor; + if (!rowCellCursor.hasNext()) + { + return checkNextFlags(); + } + else + { + return state = State.CELL_HEADER_START; + } + } + + // TODO: introduce cell header class + public int readCellHeader() throws IOException + { + if (state != State.CELL_HEADER_START) throw new IllegalStateException(); + if (cellCursor.readCellHeader()) + { + return state = State.CELL_VALUE_START; + } + return !cellCursor.hasNext() ? checkNextFlags() : (state = State.CELL_END); + } + + @Inline + public int skipCellValue() throws IOException + { + if (state != State.CELL_VALUE_START) throw new IllegalStateException(); + cellCursor.cellType.skipValue(dataReader); + return !cellCursor.hasNext() ? checkNextFlags() : (state = State.CELL_HEADER_START); + } + + /** + * See: {@link org.apache.cassandra.db.rows.UnfilteredSerializer#serialize(RangeTombstoneMarker, SerializationHelper, DataOutputPlus, long, int)} + * <pre> + * struct range_tombstone_marker { + * byte flags = IS_MARKER; + * byte kind_ordinal; + * be16 bound_values_count; + * struct clustering_block[] clustering_blocks; + * varint marker_body_size; + * varint prev_unfiltered_size; + * }; + * struct range_tombstone_bound_marker : range_tombstone_marker { + * struct delta_deletion_time deletion_time; + * }; + * struct range_tombstone_boundary_marker : range_tombstone_marker { + * struct delta_deletion_time end_deletion_time; + * struct delta_deletion_time start_deletion_time; + * }; + * </pre> + * + /// TODO: tombstone as resizable buffer + */ + public int readTombstoneMarker(ElementDescriptor elementDescriptor) throws IOException + { + if (state != TOMBSTONE_START) throw new IllegalStateException(); + if (!UnfilteredSerializer.isTombstoneMarker(basicElementFlags)) throw new IllegalStateException(); + elementDescriptor.loadTombstone(dataReader, serializationHeader, deserializationHelper, clusteringColumnTypes, basicElementFlags); + // unfilteredStart = dataReader.getPosition() - 1; + + return checkNextFlags(); + } +// +// /** +// * TODO: deduplicate for tombstones +// * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize +// */ +// private void readUnfilteredClustering(AbstractType<?>[] types, int clusteringColumnsBound) throws IOException +// { +// if (clusteringColumnsBound == 0) { +// clusteringLength = 0; +// return; +// } +// long clusteringStartPosition = dataReader.getPosition(); +// skipClustering(dataReader, types, clusteringColumnsBound); +// long clusteringLengthLong = dataReader.getPosition() - clusteringStartPosition; +// // Notionally, max clustering size is 2G, with each column limit at 64k, +// if (clusteringLengthLong > Integer.MAX_VALUE) { +// throw new IllegalStateException(); +// } +// clusteringLength = (int) clusteringLengthLong; +// if (clusteringLength > clustering.length) { +// clustering = new byte[Pow2.roundToPowerOfTwo(clusteringLength)]; +// clusteringBuffer = ByteBuffer.wrap(clustering); // would be nice if it was re-usable +// } +// dataReader.seek(clusteringStartPosition); +// dataReader.readFully(clustering, 0, clusteringLength); +// clusteringBuffer.limit(clusteringLength); +// } + + /** + * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize} + */ + static void readUnfilteredClustering(RandomAccessReader dataReader, AbstractType<?>[] types, int clusteringColumnsBound, ResizableByteBuffer clustering) throws IOException + { + if (clusteringColumnsBound == 0) { + clustering.resetBuffer(); + return; + } + long clusteringStartPosition = dataReader.getPosition(); + skipClustering(dataReader, types, clusteringColumnsBound); + long clusteringLengthLong = dataReader.getPosition() - clusteringStartPosition; + + // Notionally, max clustering size is 2G, with each column limit at 64k, + if (clusteringLengthLong > Integer.MAX_VALUE) { + throw new IllegalStateException(); + } + dataReader.seek(clusteringStartPosition); Review Comment: I agree it is not ideal, the tradeoff is a single array copy call vs many. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

