blambov commented on code in PR #4402:
URL: https://github.com/apache/cassandra/pull/4402#discussion_r2428463328
##########
src/java/org/apache/cassandra/utils/vint/VIntCoding.java:
##########
@@ -259,6 +265,40 @@ public static long getUnsignedVInt(ByteBuffer input, int
readerIndex, int reader
return retval;
}
+ public static long getUnsignedVInt(byte[] input, int offset, int length)
Review Comment:
These don't appear to be used.
##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java:
##########
@@ -46,7 +46,7 @@
/**
* Base class for the sstable writers used by CQLSSTableWriter.
*/
-abstract class AbstractSSTableSimpleWriter implements Closeable
+public abstract class AbstractSSTableSimpleWriter implements Closeable
Review Comment:
This doesn't need to be made public.
##########
src/java/org/apache/cassandra/db/LivenessInfo.java:
##########
@@ -53,7 +53,7 @@ public class LivenessInfo implements IMeasurableMemory
public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);
- protected final long timestamp;
+ protected long timestamp;
Review Comment:
This class can be easily converted to interface so that we don't have to
make the base mutable.
##########
src/java/org/apache/cassandra/io/sstable/ClusteringDescriptor.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static
org.apache.cassandra.io.sstable.SSTableCursorReader.readUnfilteredClustering;
+
+public class ClusteringDescriptor extends ResizableByteBuffer
+{
+ public static final byte EXCL_END_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.EXCL_END_BOUND.ordinal();
+ public static final byte INCL_START_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.INCL_START_BOUND.ordinal();
+ public static final byte INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE =
(byte) ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY.ordinal();
+
+ public static final byte STATIC_CLUSTERING_TYPE =
(byte)ClusteringPrefix.Kind.STATIC_CLUSTERING.ordinal();
+ public static final byte ROW_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.CLUSTERING.ordinal();
+
+ public static final byte EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE =
(byte) ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY.ordinal();
+ public static final byte INCL_END_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.INCL_END_BOUND.ordinal();
+ public static final byte EXCL_START_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.EXCL_START_BOUND.ordinal();
+
+ protected AbstractType<?>[] clusteringTypes;
Review Comment:
The clustering types will not change for the duration of a compaction
operation; even if one source has a different set, it can only differ in types
added at the end and should make no difference for processing the clustering.
Perhaps fix this and not pass it on every load/copy?
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorPipeUtil.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.Cell;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorPipeUtil
Review Comment:
As this is only used by tests, it should move under `test/`.
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
Review Comment:
I'm curious what the benefit of these END states is. Why can't we advance to
the next entry directly?
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
Review Comment:
The fact that we don't know the key at partition/unfiltered/cell start leads
to some odd-looking code in `CompactionCursor`. Is there a benefit to delay the
reading of the header until it is explicitly requested?
##########
src/java/org/apache/cassandra/io/sstable/ElementDescriptor.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static
org.apache.cassandra.io.sstable.SSTableCursorReader.readUnfilteredDeletionTime;
+
+public class ElementDescriptor extends ClusteringDescriptor
+{
+ private final ReusableLivenessInfo rowLivenessInfo = new
ReusableLivenessInfo();
+ private final DeletionTime deletionTime = DeletionTime.build(0, 0);
+ private final DeletionTime deletionTime2 = DeletionTime.build(0, 0);
+
+ private long position;
+ private int flags;
+ private int extendedFlags;
+
+ private long unfilteredSize;
+ private long unfilteredDataStart;
+// private long prevUnfilteredSize;
+ private long unfilteredCellStart;
+ Columns rowColumns;
+
+ void loadTombstone(RandomAccessReader dataReader,
+ SerializationHeader serializationHeader,
+ DeserializationHelper deserializationHelper,
+ AbstractType<?>[] clusteringColumnTypes,
+ int flags) throws IOException
+ {
+ this.flags = flags;
+ this.extendedFlags = 0;
+ rowColumns = null;
+ byte clusteringKind = dataReader.readByte();
+ if (clusteringKind == STATIC_CLUSTERING_TYPE || clusteringKind ==
ROW_CLUSTERING_TYPE) {
+ // STATIC_CLUSTERING or CLUSTERING -> no deletion info, should not
happen
+ throw new IllegalStateException();
+ }
+
+ int columnsBound = dataReader.readUnsignedShort();
+ loadClustering(dataReader, clusteringColumnTypes, clusteringKind,
columnsBound);
+ this.unfilteredSize = dataReader.readUnsignedVInt();
+ dataReader.readUnsignedVInt(); // Unused: prevUnfilteredSize
+ if (clusteringKind == EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE ||
clusteringKind == INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE)
+ {
+ // boundary
+ readUnfilteredDeletionTime(dataReader, serializationHeader,
deletionTime); // CLOSE
+ readUnfilteredDeletionTime(dataReader, serializationHeader,
deletionTime2); // OPEN
+ }
+ else
+ {
+ // bound
+ readUnfilteredDeletionTime(dataReader, serializationHeader,
deletionTime); // CLOSE|OPEN
+ }
+ }
+
+ void loadRow(RandomAccessReader dataReader,
+ SerializationHeader serializationHeader,
+ DeserializationHelper deserializationHelper,
+ AbstractType<?>[] clusteringTypes,
+ int flags) throws IOException {
+ // body = whatever is covered by size, so inclusive of the
prev_row_size inclusive of flags
+ position = dataReader.getPosition() - 1;
+ this.flags = flags;
+ this.extendedFlags = 0;
+
+ loadClustering(dataReader, clusteringTypes, ROW_CLUSTERING_TYPE,
clusteringTypes.length);
+
+ rowColumns = serializationHeader.columns(false);
+
+ loadCommonRowFields(dataReader, serializationHeader,
deserializationHelper, flags);
+ }
+
+ void loadStaticRow(RandomAccessReader dataReader,
+ SerializationHeader serializationHeader,
+ DeserializationHelper deserializationHelper,
+ int flags,
Review Comment:
Nit: indentation is wrong
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java:
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.primitives.Ints;
+
+import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ClusteringBoundOrBoundary;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigFormatPartitionWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
+import org.apache.cassandra.io.sstable.format.big.RowIndexEntry;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.db.rows.UnfilteredSerializer.*;
+
+public class SSTableCursorWriter implements AutoCloseable
+{
+ private static final UnfilteredSerializer SERIALIZER =
UnfilteredSerializer.serializer;
+ private static final ColumnMetadata[] EMPTY_COL_META = new
ColumnMetadata[0];
+ private final SortedTableWriter<?,?> ssTableWriter;
+ private final SequentialWriter dataWriter;
+ private final SortedTableWriter.AbstractIndexWriter indexWriter;
+ private final DeletionTime.Serializer deletionTimeSerializer;
+ private final MetadataCollector metadataCollector;
+ private final SerializationHeader serializationHeader;
+ /**
+ * See: {@link BloomFilter#reusableIndexes}
+ */
+ private final long[] reusableIndexes = new long[21];
+ private final boolean hasStaticColumns;
+
+ private long partitionStart;
+ // ROW contents, needed because of the order of writing and the var int
fields
+ private int rowFlags; // discovered as we go along
+ private int rowExtendedFlags;
+ private final byte[] copyColumnValueBuffer = new byte[4096]; // used to
copy cell contents (maybe piecemiel if very large, since we don't have a direct
read option)
+ private final DataOutputBuffer rowHeaderBuffer = new DataOutputBuffer();
// holds the contents between FLAGS and SIZE
+ private final DataOutputBuffer rowBuffer = new DataOutputBuffer();
+ private final DeletionTime openMarker = DeletionTime.build(0,0);
+
+ private final ColumnMetadata[] staticColumns;
+ private final ColumnMetadata[] regularColumns;
+ private final IntArrayList missingColumns = new IntArrayList();
+ private ColumnMetadata[] columns; // points to static/regular
+ private int columnsWrittenCount = 0;
+ private int nextCellIndex = 0;
+ // Index info
+ private final DataOutputBuffer rowIndexEntries = new DataOutputBuffer();
+ private final IntArrayList rowIndexEntriesOffsets = new IntArrayList();
+ private final ClusteringDescriptor rowIndexEntryLastClustering = new
ClusteringDescriptor();
+ private int indexBlockStartOffset;
+ private int rowIndexEntryOffset;
+ private final int indexBlockThreshold;
+
+
+ private SSTableCursorWriter(
+ Descriptor desc,
+ SortedTableWriter<?,?> ssTableWriter,
+ SequentialWriter dataWriter,
+ SortedTableWriter.AbstractIndexWriter indexWriter,
+ MetadataCollector metadataCollector,
+ SerializationHeader serializationHeader)
+ {
+ this.ssTableWriter = ssTableWriter;
+ this.dataWriter = dataWriter;
+ this.indexWriter = indexWriter;
+ this.deletionTimeSerializer = DeletionTime.getSerializer(desc.version);
+ this.metadataCollector = metadataCollector;
+ this.serializationHeader = serializationHeader;
+ hasStaticColumns = serializationHeader.hasStatic();
+ staticColumns = hasStaticColumns ?
serializationHeader.columns(true).toArray(EMPTY_COL_META) : EMPTY_COL_META;
+ regularColumns =
serializationHeader.columns(false).toArray(EMPTY_COL_META);
+ this.indexBlockThreshold =
DatabaseDescriptor.getColumnIndexSize(BigFormatPartitionWriter.DEFAULT_GRANULARITY);
+ }
+
+ public SSTableCursorWriter(SortedTableWriter<?,?> ssTableWriter)
+ {
+ this(ssTableWriter.descriptor,
+ ssTableWriter,
+ ssTableWriter.dataWriter,
+ ssTableWriter.indexWriter,
+ ssTableWriter.metadataCollector,
+ ssTableWriter.partitionWriter.getHeader());
+ }
+
+ @Override
+ public void close()
+ {
+ SSTableReader finish = ssTableWriter.finish(false);
+ if (finish != null) {
+ Ref<SSTableReader> ref = finish.ref();
+ if (ref != null) ref.close();
+ }
+ ssTableWriter.close();
+ }
+
+ public long getPartitionStart()
+ {
+ return partitionStart;
+ }
+
+ public long getPosition()
+ {
+ return dataWriter.position();
+ }
+
+// public int writePartitionStart(PartitionHeader pHeader) throws
IOException
+// {
+// return writePartitionStart(pHeader.keyBytes(), pHeader.keyLength(),
pHeader.deletionTime());
+// }
+//
+ public int writePartitionStart(byte[] partitionKey, int
partitionKeyLength, DeletionTime partitionDeletionTime) throws IOException
+ {
+ rowIndexEntries.clear();
+ rowIndexEntriesOffsets.clear();
+ rowIndexEntryOffset = 0;
+ openMarker.resetLive();
+
+ partitionStart = dataWriter.position();
+ writePartitionHeader(partitionKey, partitionKeyLength,
partitionDeletionTime);
+ updateIndexBlockStartOffset(dataWriter.position());
+ return indexBlockStartOffset;
+ }
+
+// public void writePartitionEnd(PartitionHeader pHeader, int headerLength)
throws IOException
+// {
+// writePartitionEnd(pHeader.keyBytes(), pHeader.keyLength(),
pHeader.deletionTime(), headerLength);
+// }
+
+ public void writePartitionEnd(byte[] partitionKey, int partitionKeyLength,
DeletionTime partitionDeletionTime, int headerLength) throws IOException
+ {
+ SERIALIZER.writeEndOfPartition(dataWriter);
+ long partitionEnd = dataWriter.position();
+ long partitionSize = partitionEnd - partitionStart;
+ addPartitionMetadata(partitionKey, partitionKeyLength, partitionSize,
partitionDeletionTime);
+
+ /** {@link SortedTableWriter#endPartition(DecoratedKey, DeletionTime)}
+ lastWrittenKey = key; // tracked for verification, see {@link
SortedTableWriter#verifyPartition(DecoratedKey)}, checking the key size and
sorting
+ // first/last are retained for metadata {@link
SSTableWriter#finalizeMetadata()}. They are also exposed via
+ // getters from the writer, but usage is unclear.
+ last = lastWrittenKey;
+ if (first == null)
+ first = lastWrittenKey;
+ // this is implemented differently for BIG/BTI
+ createRowIndexEntry(key, partitionLevelDeletion, partitionEnd - 1);
+ */
+ appendBIGIndex(partitionKey, partitionKeyLength, partitionStart,
headerLength, partitionDeletionTime, partitionEnd);
+ }
+
+ private void appendBIGIndex(byte[] key, int keyLength, long
partitionStart, int headerLength, DeletionTime partitionDeletionTime, long
partitionEnd) throws IOException
Review Comment:
Is it not easy to modify and reuse the index building code from
`BigFormatPartitionWriter`? The duplication here seems quite unnecessary.
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java:
##########
@@ -53,7 +53,7 @@
*
* @see SSTableSimpleWriter
*/
-class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
+public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
Review Comment:
This doesn't need to be made public.
##########
src/java/org/apache/cassandra/io/sstable/PartitionDescriptor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class PartitionDescriptor extends ResizableByteBuffer
Review Comment:
Since this is only created once, we don't gain much by descending from
instead of including a buffer.
And if we do not descend, maybe we can make thing simpler by having a
`ReusableDecoratedKey` instance instead of only the buffer?
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
+ /** EOF */
+ int DONE = 1 << 10;
+ int SEEK = 1 << 11;
+ static boolean isState(int state, int mask) {
+ return (state & mask) != 0;
+ }
+ }
+
+ public class CellCursor {
+ public ReusableLivenessInfo rowLiveness;
+ public Columns columns;
+
+ public int columnsSize;
+ public int columnsIndex;
+ public int cellFlags;
+ public final ReusableLivenessInfo cellLiveness = new
ReusableLivenessInfo();
+ public CellPath cellPath;
+ public AbstractType<?> cellType;
+ public ColumnMetadata cellColumn;
+ private ColumnMetadata[] columnsArray;
+ private AbstractType<?>[] cellTypeArray;
+
+ void init (Columns columns, ReusableLivenessInfo rowLiveness)
+ {
+ if (this.columns != columns)
+ {
+ // This will be a problem with changing columns
+ this.columns = columns;
+ columnsArray = columns.toArray(COLUMN_METADATA_TYPE);
+ cellTypeArray = new AbstractType<?>[columnsArray.length];
+ for (int i = 0; i < columnsArray.length; i++)
+ {
+ ColumnMetadata cellColumn = columnsArray[i];
+ cellTypeArray[i] =
serializationHeader.getType(cellColumn);
+ }
+ // HOTSPOT: size is suprisingly expensive
+ columnsSize = columns.size();
+ }
+ this.rowLiveness = rowLiveness;
+ columnsIndex = 0;
+ cellFlags = 0;
+ cellPath = null;
+ cellType = null;
+ }
+
+ public boolean hasNext()
+ {
+ return columnsIndex < columnsSize;
+ }
+
+ /**
+ * For Cell deserialization see {@link Cell.Serializer#deserialize}
+ *
+ * @return true if has value, false otherwise
+ */
+ boolean readCellHeader() throws IOException
+ {
+ if (!(columnsIndex < columnsSize)) throw new
IllegalStateException();
+
+ // HOTSPOT: suprisingly expensive
+ int currIndex = columnsIndex++;
+ cellColumn = columnsArray[currIndex];
+ cellType = cellTypeArray[currIndex];
+ cellFlags = dataReader.readUnsignedByte();
+ // TODO: specialize common case where flags == HAS_VALUE |
USE_ROW_TS?
+ boolean hasValue = Cell.Serializer.hasValue(cellFlags);
+ boolean isDeleted = Cell.Serializer.isDeleted(cellFlags);
+ boolean isExpiring = Cell.Serializer.isExpiring(cellFlags);
+ boolean useRowTimestamp =
Cell.Serializer.useRowTimestamp(cellFlags);
+ boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags);
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() :
serializationHeader.readTimestamp(dataReader);
+
+ long localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ?
serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ?
serializationHeader.readTTL(dataReader) : Cell.NO_TTL);
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper);
+
+ cellLiveness.reset(timestamp, ttl, localDeletionTime);
+ cellPath = cellColumn.isComplex()
+ ?
cellColumn.cellPathSerializer().deserialize(dataReader)
+ : null;
+ return hasValue;
+ }
+ }
+
+ int state = PARTITION_START;
+
+ private final Version version;
+ private final TableMetadata metadata;
+ private final ImmutableList<ColumnMetadata> clusteringColumns;
+ public final AbstractType<?>[] clusteringColumnTypes;
+ private final StatsMetadata statsMetadata;
Review Comment:
Nit: unused
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
Review Comment:
Nit: unused
##########
src/java/org/apache/cassandra/io/sstable/ClusteringDescriptor.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static
org.apache.cassandra.io.sstable.SSTableCursorReader.readUnfilteredClustering;
+
+public class ClusteringDescriptor extends ResizableByteBuffer
+{
+ public static final byte EXCL_END_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.EXCL_END_BOUND.ordinal();
+ public static final byte INCL_START_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.INCL_START_BOUND.ordinal();
+ public static final byte INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE =
(byte) ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY.ordinal();
+
+ public static final byte STATIC_CLUSTERING_TYPE =
(byte)ClusteringPrefix.Kind.STATIC_CLUSTERING.ordinal();
+ public static final byte ROW_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.CLUSTERING.ordinal();
+
+ public static final byte EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE =
(byte) ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY.ordinal();
+ public static final byte INCL_END_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.INCL_END_BOUND.ordinal();
+ public static final byte EXCL_START_BOUND_CLUSTERING_TYPE = (byte)
ClusteringPrefix.Kind.EXCL_START_BOUND.ordinal();
+
+ protected AbstractType<?>[] clusteringTypes;
+ protected ClusteringPrefix.Kind clusteringKind;
+ protected byte clusteringKindEncoded;
+ protected int clusteringColumnsBound;
+
+ protected void loadClustering(RandomAccessReader dataReader,
AbstractType<?>[] clusteringColumnTypes, byte clusteringKind, int
clusteringColumnsBound) throws IOException
+ {
+ clusteringTypes = clusteringColumnTypes;
+ this.clusteringKindEncoded = clusteringKind;
+ this.clusteringKind = ClusteringPrefix.Kind.values()[clusteringKind];
+ this.clusteringColumnsBound = clusteringColumnsBound;
+ if (clusteringKind != STATIC_CLUSTERING_TYPE)
+ readUnfilteredClustering(dataReader, clusteringTypes,
this.clusteringColumnsBound, this);
+ else
+ resetBuffer();
+ }
+
+ public ClusteringDescriptor resetMin() {
+ set(null, ClusteringBound.MIN_END.kind(), 0);
+ resetBuffer();
+ return this;
+ }
+
+ public ClusteringDescriptor resetMax() {
+ set(null, ClusteringBound.MAX_START.kind(), 0);
+ resetBuffer();
+ return this;
+ }
+
+ public final void resetClustering()
+ {
+ set(null, ClusteringPrefix.Kind.CLUSTERING, 0);
+
+ resetBuffer();
+ }
+
+ public void copy(ClusteringDescriptor newClustering)
+ {
+ set(newClustering.clusteringTypes, newClustering.clusteringKind,
newClustering.clusteringColumnsBound());
+ overwrite(newClustering.clusteringBytes(),
newClustering.clusteringLength());
+ }
+
+ private void set(AbstractType<?>[] clusteringColumnTypes,
ClusteringPrefix.Kind clusteringKind, int clusteringColumnsBound) {
+ clusteringTypes = clusteringColumnTypes;
+ this.clusteringKindEncoded = (byte) clusteringKind.ordinal();
+ this.clusteringKind = clusteringKind;
+ this.clusteringColumnsBound = clusteringColumnsBound;
+ }
+
+ // Expose and rename parent data
+ public ByteBuffer clusteringBuffer() {
+ return buffer();
+ }
+
+ public int clusteringLength() {
+ return length();
+ }
+
+ public byte[] clusteringBytes() {
+ return bytes();
+ }
+
+ public AbstractType<?>[] clusteringTypes()
+ {
+ return clusteringTypes;
+ }
+
+ public byte clusteringKindEncoded() {
+ return clusteringKindEncoded;
+ }
+
+ public ClusteringPrefix.Kind clusteringKind() {
+ return clusteringKind;
+ }
+
+ public void clusteringKind(ClusteringPrefix.Kind kind)
+ {
+ clusteringKind = kind;
+ clusteringKindEncoded = (byte)kind.ordinal();
+ }
+
+ public int clusteringColumnsBound() {
+ return clusteringColumnsBound;
+ }
+
+ public boolean isStartBound()
+ {
+ return (clusteringKindEncoded == INCL_START_BOUND_CLUSTERING_TYPE ||
clusteringKindEncoded == EXCL_START_BOUND_CLUSTERING_TYPE);
+ }
+
+ public boolean isEndBound()
+ {
+ return (clusteringKindEncoded == INCL_END_BOUND_CLUSTERING_TYPE ||
clusteringKindEncoded == EXCL_END_BOUND_CLUSTERING_TYPE);
+ }
+
+ public boolean isBoundary()
+ {
+ return (clusteringKindEncoded ==
EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE || clusteringKindEncoded ==
INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE);
+ }
+
+ public ClusteringPrefix<?> toClusteringPrefix(List<AbstractType<?>>
clusteringTypesList) {
+ if (clusteringKindEncoded == ROW_CLUSTERING_TYPE) {
+ return Clustering.serializer.deserialize(clusteringBuffer(), 0,
clusteringTypesList);
+ }
+ else if (clusteringColumnsBound == 0) {
+ return ByteArrayAccessor.factory.bound(clusteringKind);
+ }
+ else {
+ byte[][] values;
+ try (DataInputBuffer buffer = new
DataInputBuffer(clusteringBuffer(), true))
+ {
+ values =
ClusteringPrefix.serializer.deserializeValuesWithoutSize(buffer,
clusteringColumnsBound, 0, clusteringTypesList);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Reading from an in-memory buffer
shouldn't trigger an IOException", e);
+ }
+ return ByteArrayAccessor.factory.boundOrBoundary(clusteringKind,
values);
+ }
+ }
+
+ public boolean clusteringEquals(ClusteringDescriptor clusteringDescriptor)
Review Comment:
Nit: This method is unused.
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
+ /** EOF */
+ int DONE = 1 << 10;
+ int SEEK = 1 << 11;
+ static boolean isState(int state, int mask) {
+ return (state & mask) != 0;
+ }
+ }
+
+ public class CellCursor {
+ public ReusableLivenessInfo rowLiveness;
+ public Columns columns;
+
+ public int columnsSize;
+ public int columnsIndex;
+ public int cellFlags;
+ public final ReusableLivenessInfo cellLiveness = new
ReusableLivenessInfo();
+ public CellPath cellPath;
+ public AbstractType<?> cellType;
+ public ColumnMetadata cellColumn;
+ private ColumnMetadata[] columnsArray;
+ private AbstractType<?>[] cellTypeArray;
+
+ void init (Columns columns, ReusableLivenessInfo rowLiveness)
+ {
+ if (this.columns != columns)
+ {
+ // This will be a problem with changing columns
+ this.columns = columns;
+ columnsArray = columns.toArray(COLUMN_METADATA_TYPE);
+ cellTypeArray = new AbstractType<?>[columnsArray.length];
+ for (int i = 0; i < columnsArray.length; i++)
+ {
+ ColumnMetadata cellColumn = columnsArray[i];
+ cellTypeArray[i] =
serializationHeader.getType(cellColumn);
+ }
+ // HOTSPOT: size is suprisingly expensive
+ columnsSize = columns.size();
+ }
+ this.rowLiveness = rowLiveness;
+ columnsIndex = 0;
+ cellFlags = 0;
+ cellPath = null;
+ cellType = null;
+ }
+
+ public boolean hasNext()
+ {
+ return columnsIndex < columnsSize;
+ }
+
+ /**
+ * For Cell deserialization see {@link Cell.Serializer#deserialize}
+ *
+ * @return true if has value, false otherwise
+ */
+ boolean readCellHeader() throws IOException
+ {
+ if (!(columnsIndex < columnsSize)) throw new
IllegalStateException();
+
+ // HOTSPOT: suprisingly expensive
+ int currIndex = columnsIndex++;
+ cellColumn = columnsArray[currIndex];
+ cellType = cellTypeArray[currIndex];
+ cellFlags = dataReader.readUnsignedByte();
+ // TODO: specialize common case where flags == HAS_VALUE |
USE_ROW_TS?
+ boolean hasValue = Cell.Serializer.hasValue(cellFlags);
+ boolean isDeleted = Cell.Serializer.isDeleted(cellFlags);
+ boolean isExpiring = Cell.Serializer.isExpiring(cellFlags);
+ boolean useRowTimestamp =
Cell.Serializer.useRowTimestamp(cellFlags);
+ boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags);
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() :
serializationHeader.readTimestamp(dataReader);
+
+ long localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ?
serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ?
serializationHeader.readTTL(dataReader) : Cell.NO_TTL);
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper);
+
+ cellLiveness.reset(timestamp, ttl, localDeletionTime);
+ cellPath = cellColumn.isComplex()
+ ?
cellColumn.cellPathSerializer().deserialize(dataReader)
+ : null;
+ return hasValue;
+ }
+ }
+
+ int state = PARTITION_START;
+
+ private final Version version;
+ private final TableMetadata metadata;
+ private final ImmutableList<ColumnMetadata> clusteringColumns;
+ public final AbstractType<?>[] clusteringColumnTypes;
+ private final StatsMetadata statsMetadata;
+ private final DeserializationHelper deserializationHelper;
+ private final EncodingStats encodingStats;
Review Comment:
Nit: unused
##########
src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java:
##########
@@ -44,7 +44,7 @@
* The output will be a series of SSTables that do not exceed a specified size.
* By default, all sorted data are written into a single SSTable.
*/
-class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
+public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
Review Comment:
This doesn't need to be made public.
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
+ /** EOF */
+ int DONE = 1 << 10;
+ int SEEK = 1 << 11;
+ static boolean isState(int state, int mask) {
+ return (state & mask) != 0;
+ }
+ }
+
+ public class CellCursor {
+ public ReusableLivenessInfo rowLiveness;
+ public Columns columns;
+
+ public int columnsSize;
+ public int columnsIndex;
+ public int cellFlags;
+ public final ReusableLivenessInfo cellLiveness = new
ReusableLivenessInfo();
+ public CellPath cellPath;
+ public AbstractType<?> cellType;
+ public ColumnMetadata cellColumn;
+ private ColumnMetadata[] columnsArray;
+ private AbstractType<?>[] cellTypeArray;
+
+ void init (Columns columns, ReusableLivenessInfo rowLiveness)
+ {
+ if (this.columns != columns)
+ {
+ // This will be a problem with changing columns
+ this.columns = columns;
+ columnsArray = columns.toArray(COLUMN_METADATA_TYPE);
+ cellTypeArray = new AbstractType<?>[columnsArray.length];
+ for (int i = 0; i < columnsArray.length; i++)
+ {
+ ColumnMetadata cellColumn = columnsArray[i];
+ cellTypeArray[i] =
serializationHeader.getType(cellColumn);
+ }
+ // HOTSPOT: size is suprisingly expensive
+ columnsSize = columns.size();
+ }
+ this.rowLiveness = rowLiveness;
+ columnsIndex = 0;
+ cellFlags = 0;
+ cellPath = null;
+ cellType = null;
+ }
+
+ public boolean hasNext()
+ {
+ return columnsIndex < columnsSize;
+ }
+
+ /**
+ * For Cell deserialization see {@link Cell.Serializer#deserialize}
+ *
+ * @return true if has value, false otherwise
+ */
+ boolean readCellHeader() throws IOException
+ {
+ if (!(columnsIndex < columnsSize)) throw new
IllegalStateException();
+
+ // HOTSPOT: suprisingly expensive
+ int currIndex = columnsIndex++;
+ cellColumn = columnsArray[currIndex];
+ cellType = cellTypeArray[currIndex];
+ cellFlags = dataReader.readUnsignedByte();
+ // TODO: specialize common case where flags == HAS_VALUE |
USE_ROW_TS?
+ boolean hasValue = Cell.Serializer.hasValue(cellFlags);
+ boolean isDeleted = Cell.Serializer.isDeleted(cellFlags);
+ boolean isExpiring = Cell.Serializer.isExpiring(cellFlags);
+ boolean useRowTimestamp =
Cell.Serializer.useRowTimestamp(cellFlags);
+ boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags);
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() :
serializationHeader.readTimestamp(dataReader);
+
+ long localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ?
serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ?
serializationHeader.readTTL(dataReader) : Cell.NO_TTL);
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper);
+
+ cellLiveness.reset(timestamp, ttl, localDeletionTime);
+ cellPath = cellColumn.isComplex()
+ ?
cellColumn.cellPathSerializer().deserialize(dataReader)
+ : null;
+ return hasValue;
+ }
+ }
+
+ int state = PARTITION_START;
+
+ private final Version version;
+ private final TableMetadata metadata;
+ private final ImmutableList<ColumnMetadata> clusteringColumns;
+ public final AbstractType<?>[] clusteringColumnTypes;
+ private final StatsMetadata statsMetadata;
+ private final DeserializationHelper deserializationHelper;
+ private final EncodingStats encodingStats;
+ private final SerializationHeader serializationHeader;
+
+ // need to be closed
+ public final SSTableReader ssTableReader;
+ private final RandomAccessReader dataReader;
+ private final DeletionTime.Serializer deletionTimeSerializer;
+ // in serialization order (maybe use inheritance to clamp them together?)
+ public long partitionStart = 0;
+
+ // SHARED STATIC_ROW/ROW/TOMB
+ public int basicElementFlags = 0;
+ public int extendedFlags = 0;
+
+ private final CellCursor staticRowCellCursor = new CellCursor();
+ private final CellCursor rowCellCursor = new CellCursor();
+ public CellCursor cellCursor;
+
+ public SSTableCursorReader(Descriptor desc) throws IOException
+ {
+ metadata = Util.metadataFromSSTable(desc);
+ ssTableReader = SSTableReader.openNoValidation(null, desc,
TableMetadataRef.forOfflineTools(metadata));
+ ssTableReaderRef = ssTableReader.ref();
+ version = desc.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = ssTableReader.getSSTableMetadata();
+ encodingStats = ssTableReader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
desc.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = ssTableReader.header;
+
+ dataReader = ssTableReader.openDataReader();
+ }
+
+ public SSTableCursorReader(SSTableReader reader)
+ {
+ metadata = reader.metadata();
+ ssTableReader = reader;
+ ssTableReaderRef = null;
+ version = reader.descriptor.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
Review Comment:
We could extract the shared code below for the two constructors.
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java:
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.primitives.Ints;
+
+import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ClusteringBoundOrBoundary;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigFormatPartitionWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
+import org.apache.cassandra.io.sstable.format.big.RowIndexEntry;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.db.rows.UnfilteredSerializer.*;
+
+public class SSTableCursorWriter implements AutoCloseable
+{
+ private static final UnfilteredSerializer SERIALIZER =
UnfilteredSerializer.serializer;
+ private static final ColumnMetadata[] EMPTY_COL_META = new
ColumnMetadata[0];
+ private final SortedTableWriter<?,?> ssTableWriter;
+ private final SequentialWriter dataWriter;
+ private final SortedTableWriter.AbstractIndexWriter indexWriter;
+ private final DeletionTime.Serializer deletionTimeSerializer;
+ private final MetadataCollector metadataCollector;
+ private final SerializationHeader serializationHeader;
+ /**
+ * See: {@link BloomFilter#reusableIndexes}
+ */
+ private final long[] reusableIndexes = new long[21];
+ private final boolean hasStaticColumns;
+
+ private long partitionStart;
+ // ROW contents, needed because of the order of writing and the var int
fields
+ private int rowFlags; // discovered as we go along
+ private int rowExtendedFlags;
+ private final byte[] copyColumnValueBuffer = new byte[4096]; // used to
copy cell contents (maybe piecemiel if very large, since we don't have a direct
read option)
+ private final DataOutputBuffer rowHeaderBuffer = new DataOutputBuffer();
// holds the contents between FLAGS and SIZE
+ private final DataOutputBuffer rowBuffer = new DataOutputBuffer();
+ private final DeletionTime openMarker = DeletionTime.build(0,0);
+
+ private final ColumnMetadata[] staticColumns;
+ private final ColumnMetadata[] regularColumns;
+ private final IntArrayList missingColumns = new IntArrayList();
+ private ColumnMetadata[] columns; // points to static/regular
+ private int columnsWrittenCount = 0;
+ private int nextCellIndex = 0;
+ // Index info
+ private final DataOutputBuffer rowIndexEntries = new DataOutputBuffer();
+ private final IntArrayList rowIndexEntriesOffsets = new IntArrayList();
+ private final ClusteringDescriptor rowIndexEntryLastClustering = new
ClusteringDescriptor();
+ private int indexBlockStartOffset;
+ private int rowIndexEntryOffset;
+ private final int indexBlockThreshold;
+
+
+ private SSTableCursorWriter(
Review Comment:
This class should be split into a common `SortedTableCursorWriter`, with
format-specific subclasses that instantiate the index builders it uses, and
placed into the correct per-format packages.
##########
src/java/org/apache/cassandra/io/util/ReusableDecoratedKey.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ReusableDecoratedKey extends DecoratedKey
Review Comment:
This class is pretty hacky. It shouldn't be hard to move the support for
reusable tokens to the partitioner (throwing exceptions for all except Murmur
and local).
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
+ /** EOF */
+ int DONE = 1 << 10;
+ int SEEK = 1 << 11;
+ static boolean isState(int state, int mask) {
+ return (state & mask) != 0;
+ }
+ }
+
+ public class CellCursor {
+ public ReusableLivenessInfo rowLiveness;
+ public Columns columns;
+
+ public int columnsSize;
+ public int columnsIndex;
+ public int cellFlags;
+ public final ReusableLivenessInfo cellLiveness = new
ReusableLivenessInfo();
+ public CellPath cellPath;
+ public AbstractType<?> cellType;
+ public ColumnMetadata cellColumn;
+ private ColumnMetadata[] columnsArray;
+ private AbstractType<?>[] cellTypeArray;
+
+ void init (Columns columns, ReusableLivenessInfo rowLiveness)
+ {
+ if (this.columns != columns)
+ {
+ // This will be a problem with changing columns
+ this.columns = columns;
+ columnsArray = columns.toArray(COLUMN_METADATA_TYPE);
+ cellTypeArray = new AbstractType<?>[columnsArray.length];
+ for (int i = 0; i < columnsArray.length; i++)
+ {
+ ColumnMetadata cellColumn = columnsArray[i];
+ cellTypeArray[i] =
serializationHeader.getType(cellColumn);
+ }
+ // HOTSPOT: size is suprisingly expensive
+ columnsSize = columns.size();
+ }
+ this.rowLiveness = rowLiveness;
+ columnsIndex = 0;
+ cellFlags = 0;
+ cellPath = null;
+ cellType = null;
+ }
+
+ public boolean hasNext()
+ {
+ return columnsIndex < columnsSize;
+ }
+
+ /**
+ * For Cell deserialization see {@link Cell.Serializer#deserialize}
+ *
+ * @return true if has value, false otherwise
+ */
+ boolean readCellHeader() throws IOException
+ {
+ if (!(columnsIndex < columnsSize)) throw new
IllegalStateException();
+
+ // HOTSPOT: suprisingly expensive
+ int currIndex = columnsIndex++;
+ cellColumn = columnsArray[currIndex];
+ cellType = cellTypeArray[currIndex];
+ cellFlags = dataReader.readUnsignedByte();
+ // TODO: specialize common case where flags == HAS_VALUE |
USE_ROW_TS?
+ boolean hasValue = Cell.Serializer.hasValue(cellFlags);
+ boolean isDeleted = Cell.Serializer.isDeleted(cellFlags);
+ boolean isExpiring = Cell.Serializer.isExpiring(cellFlags);
+ boolean useRowTimestamp =
Cell.Serializer.useRowTimestamp(cellFlags);
+ boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags);
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() :
serializationHeader.readTimestamp(dataReader);
+
+ long localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ?
serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ?
serializationHeader.readTTL(dataReader) : Cell.NO_TTL);
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper);
+
+ cellLiveness.reset(timestamp, ttl, localDeletionTime);
+ cellPath = cellColumn.isComplex()
+ ?
cellColumn.cellPathSerializer().deserialize(dataReader)
+ : null;
+ return hasValue;
+ }
+ }
+
+ int state = PARTITION_START;
+
+ private final Version version;
+ private final TableMetadata metadata;
+ private final ImmutableList<ColumnMetadata> clusteringColumns;
+ public final AbstractType<?>[] clusteringColumnTypes;
+ private final StatsMetadata statsMetadata;
+ private final DeserializationHelper deserializationHelper;
+ private final EncodingStats encodingStats;
+ private final SerializationHeader serializationHeader;
+
+ // need to be closed
+ public final SSTableReader ssTableReader;
+ private final RandomAccessReader dataReader;
+ private final DeletionTime.Serializer deletionTimeSerializer;
+ // in serialization order (maybe use inheritance to clamp them together?)
+ public long partitionStart = 0;
+
+ // SHARED STATIC_ROW/ROW/TOMB
+ public int basicElementFlags = 0;
+ public int extendedFlags = 0;
+
+ private final CellCursor staticRowCellCursor = new CellCursor();
+ private final CellCursor rowCellCursor = new CellCursor();
+ public CellCursor cellCursor;
+
+ public SSTableCursorReader(Descriptor desc) throws IOException
+ {
+ metadata = Util.metadataFromSSTable(desc);
+ ssTableReader = SSTableReader.openNoValidation(null, desc,
TableMetadataRef.forOfflineTools(metadata));
+ ssTableReaderRef = ssTableReader.ref();
+ version = desc.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = ssTableReader.getSSTableMetadata();
+ encodingStats = ssTableReader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
desc.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = ssTableReader.header;
+
+ dataReader = ssTableReader.openDataReader();
+ }
+
+ public SSTableCursorReader(SSTableReader reader)
+ {
+ metadata = reader.metadata();
+ ssTableReader = reader;
+ ssTableReaderRef = null;
+ version = reader.descriptor.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = reader.getSSTableMetadata();
+ encodingStats = reader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = reader.header;
+
+ dataReader = reader.openDataReader();
+ }
+
+ @Override
+ public void close()
+ {
+ dataReader.close();
+ if (ssTableReaderRef != null)
+ ssTableReaderRef.close();
+ }
+
+ private void resetOnPartitionStart()
+ {
+ partitionStart = dataReader.getPosition();
+ basicElementFlags = 0;
+ extendedFlags = 0;
+ }
+
+ public int seekPartition(long position) throws IOException
+ {
+ state = SEEK;
+ if (position == 0)
+ {
+ dataReader.seek(position);
+ state = PARTITION_START;
+ }
+ else {
+ // verify partition start is after a partition end marker
+ dataReader.seek(position - 1);
+ if (checkNextFlags() == PARTITION_END)
+ state = PARTITION_START;
+ else
+ throw new IllegalArgumentException("Seeking to a partition at:
" + position + " did not result in a valid state");
+ }
+ resetOnPartitionStart();
+ return state;
+ }
+
+ public int seekPartitionElement(long position) throws IOException
+ {
+ // partition elements have flags
+ dataReader.seek(position);
+ int state = checkNextFlags();
+ if (!isState(state , ROW_START | TOMBSTONE_START | STATIC_ROW_START |
DONE)) throw new IllegalStateException();
+ return state;
+ }
+
+ // struct partition {
+ // struct partition_header header
+ // optional<struct row> row
+ // struct unfiltered unfiltereds[];
+ //};
+ public int readPartitionHeader(PartitionDescriptor header) throws
IOException
+ {
+ if (state != PARTITION_START) throw new IllegalStateException();
+ resetOnPartitionStart();
+ header.load(dataReader, deletionTimeSerializer);
+ return checkNextFlags();
+ }
+
+ // struct static_row {
+ // byte flags; // pre-loaded
+ // byte extended_flags; // pre-loaded
+ // varint row_body_size;
+ // varint prev_unfiltered_size; // for backward traversing, ignored
+ // optional<struct liveness_info> liveness_info;
+ // optional<struct delta_deletion_time> deletion_time;
+ // *** We read the columns in a separate method ***
+ // optional<varint[]> missing_columns;
+ // cell[] cells; // potentially only some
+ //};
+ public int readStaticRowHeader(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != STATIC_ROW_START) throw new IllegalStateException();
+ elementDescriptor.loadStaticRow(dataReader, serializationHeader,
deserializationHelper, basicElementFlags, extendedFlags);
+
+ staticRowCellCursor.init(elementDescriptor.rowColumns(),
elementDescriptor.livenessInfo());
+ cellCursor = staticRowCellCursor;
+ if (!staticRowCellCursor.hasNext())
+ {
+ return checkNextFlags();
+ }
+ else
+ {
+ return state = State.CELL_HEADER_START;
+ }
+ }
+
+ public int copyCellValue(DataOutputPlus writer, byte[] buffer) throws
IOException
+ {
+ if (state != State.CELL_VALUE_START) throw new IllegalStateException();
+ if (cellCursor.cellType == null) throw new IllegalStateException();
+ int length = cellCursor.cellType.valueLengthIfFixed();
+ copyCellContents(writer, buffer, length);
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_END);
+ }
+
+ // TODO: move to cell cursor? maybe avoid copy through buffer?
+ private void copyCellContents(DataOutputPlus writer, byte[]
transferBuffer, int length) throws IOException
+ {
+ if (length >= 0)
+ {
+ dataReader.readFully(transferBuffer, 0, length);
+ writer.write(transferBuffer, 0, length);
+ }
+ else
+ {
+ length = dataReader.readUnsignedVInt32();
+ if (length < 0)
+ throw new IOException("Corrupt (negative) value length
encountered");
+ writer.writeUnsignedVInt32(length);
+ int remaining = length;
+ while (remaining > 0)
+ {
+ int readLength = Math.min(remaining, transferBuffer.length);
+ dataReader.readFully(transferBuffer, 0, readLength);
+ writer.write(transferBuffer, 0, readLength);
+ remaining -= readLength;
+ }
+ }
+ }
+
+ // struct row {
+ // byte flags;
+ // optional<struct clustering_block[]> clustering_blocks;
+ // varint row_body_size;
+ // varint prev_unfiltered_size; // for backward traversing, ignored
+ // optional<struct liveness_info> liveness_info;
+ // optional<struct delta_deletion_time> deletion_time;
+ // *** We read the columns in a separate step ***
+ // optional<varint[]> missing_columns;
+ // cell[] cells; // potentially only some
+ //};
+ public int readRowHeader(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != State.ROW_START) throw new IllegalStateException();
+ if (!UnfilteredSerializer.isRow(basicElementFlags)) throw new
IllegalStateException();
+ elementDescriptor.loadRow(dataReader, serializationHeader,
deserializationHelper, clusteringColumnTypes, basicElementFlags);
+
+ rowCellCursor.init(elementDescriptor.rowColumns(),
elementDescriptor.livenessInfo());
+ cellCursor = rowCellCursor;
+ if (!rowCellCursor.hasNext())
+ {
+ return checkNextFlags();
+ }
+ else
+ {
+ return state = State.CELL_HEADER_START;
+ }
+ }
+
+ // TODO: introduce cell header class
+ public int readCellHeader() throws IOException
+ {
+ if (state != State.CELL_HEADER_START) throw new
IllegalStateException();
+ if (cellCursor.readCellHeader())
+ {
+ return state = State.CELL_VALUE_START;
+ }
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_END);
+ }
+
+ @Inline
+ public int skipCellValue() throws IOException
+ {
+ if (state != State.CELL_VALUE_START) throw new IllegalStateException();
+ cellCursor.cellType.skipValue(dataReader);
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_HEADER_START);
+ }
+
+ /**
+ * See: {@link
org.apache.cassandra.db.rows.UnfilteredSerializer#serialize(RangeTombstoneMarker,
SerializationHelper, DataOutputPlus, long, int)}
+ * <pre>
+ * struct range_tombstone_marker {
+ * byte flags = IS_MARKER;
+ * byte kind_ordinal;
+ * be16 bound_values_count;
+ * struct clustering_block[] clustering_blocks;
+ * varint marker_body_size;
+ * varint prev_unfiltered_size;
+ * };
+ * struct range_tombstone_bound_marker : range_tombstone_marker {
+ * struct delta_deletion_time deletion_time;
+ * };
+ * struct range_tombstone_boundary_marker : range_tombstone_marker {
+ * struct delta_deletion_time end_deletion_time;
+ * struct delta_deletion_time start_deletion_time;
+ * };
+ * </pre>
+ *
+ /// TODO: tombstone as resizable buffer
+ */
+ public int readTombstoneMarker(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != TOMBSTONE_START) throw new IllegalStateException();
+ if (!UnfilteredSerializer.isTombstoneMarker(basicElementFlags)) throw
new IllegalStateException();
+ elementDescriptor.loadTombstone(dataReader, serializationHeader,
deserializationHelper, clusteringColumnTypes, basicElementFlags);
+ // unfilteredStart = dataReader.getPosition() - 1;
+
+ return checkNextFlags();
+ }
+//
+// /**
+// * TODO: deduplicate for tombstones
+// * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize
+// */
+// private void readUnfilteredClustering(AbstractType<?>[] types, int
clusteringColumnsBound) throws IOException
+// {
+// if (clusteringColumnsBound == 0) {
+// clusteringLength = 0;
+// return;
+// }
+// long clusteringStartPosition = dataReader.getPosition();
+// skipClustering(dataReader, types, clusteringColumnsBound);
+// long clusteringLengthLong = dataReader.getPosition() -
clusteringStartPosition;
+// // Notionally, max clustering size is 2G, with each column limit at
64k,
+// if (clusteringLengthLong > Integer.MAX_VALUE) {
+// throw new IllegalStateException();
+// }
+// clusteringLength = (int) clusteringLengthLong;
+// if (clusteringLength > clustering.length) {
+// clustering = new byte[Pow2.roundToPowerOfTwo(clusteringLength)];
+// clusteringBuffer = ByteBuffer.wrap(clustering); // would be nice
if it was re-usable
+// }
+// dataReader.seek(clusteringStartPosition);
+// dataReader.readFully(clustering, 0, clusteringLength);
+// clusteringBuffer.limit(clusteringLength);
+// }
+
+ /**
+ * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize}
+ */
+ static void readUnfilteredClustering(RandomAccessReader dataReader,
AbstractType<?>[] types, int clusteringColumnsBound, ResizableByteBuffer
clustering) throws IOException
+ {
+ if (clusteringColumnsBound == 0) {
+ clustering.resetBuffer();
+ return;
+ }
+ long clusteringStartPosition = dataReader.getPosition();
+ skipClustering(dataReader, types, clusteringColumnsBound);
+ long clusteringLengthLong = dataReader.getPosition() -
clusteringStartPosition;
+
+ // Notionally, max clustering size is 2G, with each column limit at
64k,
+ if (clusteringLengthLong > Integer.MAX_VALUE) {
+ throw new IllegalStateException();
+ }
+ dataReader.seek(clusteringStartPosition);
+ clustering.load(dataReader, (int) clusteringLengthLong);
+ }
+
+ private static void skipClustering(RandomAccessReader dataReader,
AbstractType<?>[] types, int clusteringColumnsBound) throws IOException
+ {
+ long clusteringBlockHeader = 0;
+ for (int clusteringIndex = 0; clusteringIndex <
clusteringColumnsBound; clusteringIndex++)
+ {
+ // struct clustering_block {
+ // varint clustering_block_header;
+ // simple_cell[] clustering_cells;
+ // };
+ if (clusteringIndex % 32 == 0)
+ {
+ clusteringBlockHeader = dataReader.readUnsignedVInt();
+ }
+ clusteringBlockHeader = clusteringBlockHeader >>> 2;
+ // struct clustering_block {
+ // varint clustering_block_header;
+ // simple_cell[] clustering_cells;
+ // };
+ if ((clusteringBlockHeader & 0x11) == 0)
Review Comment:
This should be `0b11`. Make sure there are tests that check the behaviour
with null and empty clustering values.
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
+ /** EOF */
+ int DONE = 1 << 10;
+ int SEEK = 1 << 11;
+ static boolean isState(int state, int mask) {
+ return (state & mask) != 0;
+ }
+ }
+
+ public class CellCursor {
+ public ReusableLivenessInfo rowLiveness;
+ public Columns columns;
+
+ public int columnsSize;
+ public int columnsIndex;
+ public int cellFlags;
+ public final ReusableLivenessInfo cellLiveness = new
ReusableLivenessInfo();
+ public CellPath cellPath;
+ public AbstractType<?> cellType;
+ public ColumnMetadata cellColumn;
+ private ColumnMetadata[] columnsArray;
+ private AbstractType<?>[] cellTypeArray;
+
+ void init (Columns columns, ReusableLivenessInfo rowLiveness)
+ {
+ if (this.columns != columns)
+ {
+ // This will be a problem with changing columns
+ this.columns = columns;
+ columnsArray = columns.toArray(COLUMN_METADATA_TYPE);
+ cellTypeArray = new AbstractType<?>[columnsArray.length];
+ for (int i = 0; i < columnsArray.length; i++)
+ {
+ ColumnMetadata cellColumn = columnsArray[i];
+ cellTypeArray[i] =
serializationHeader.getType(cellColumn);
+ }
+ // HOTSPOT: size is suprisingly expensive
+ columnsSize = columns.size();
+ }
+ this.rowLiveness = rowLiveness;
+ columnsIndex = 0;
+ cellFlags = 0;
+ cellPath = null;
+ cellType = null;
+ }
+
+ public boolean hasNext()
+ {
+ return columnsIndex < columnsSize;
+ }
+
+ /**
+ * For Cell deserialization see {@link Cell.Serializer#deserialize}
+ *
+ * @return true if has value, false otherwise
+ */
+ boolean readCellHeader() throws IOException
+ {
+ if (!(columnsIndex < columnsSize)) throw new
IllegalStateException();
+
+ // HOTSPOT: suprisingly expensive
+ int currIndex = columnsIndex++;
+ cellColumn = columnsArray[currIndex];
+ cellType = cellTypeArray[currIndex];
+ cellFlags = dataReader.readUnsignedByte();
+ // TODO: specialize common case where flags == HAS_VALUE |
USE_ROW_TS?
+ boolean hasValue = Cell.Serializer.hasValue(cellFlags);
+ boolean isDeleted = Cell.Serializer.isDeleted(cellFlags);
+ boolean isExpiring = Cell.Serializer.isExpiring(cellFlags);
+ boolean useRowTimestamp =
Cell.Serializer.useRowTimestamp(cellFlags);
+ boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags);
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() :
serializationHeader.readTimestamp(dataReader);
+
+ long localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ?
serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ?
serializationHeader.readTTL(dataReader) : Cell.NO_TTL);
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper);
+
+ cellLiveness.reset(timestamp, ttl, localDeletionTime);
+ cellPath = cellColumn.isComplex()
+ ?
cellColumn.cellPathSerializer().deserialize(dataReader)
+ : null;
+ return hasValue;
+ }
+ }
+
+ int state = PARTITION_START;
+
+ private final Version version;
+ private final TableMetadata metadata;
+ private final ImmutableList<ColumnMetadata> clusteringColumns;
+ public final AbstractType<?>[] clusteringColumnTypes;
+ private final StatsMetadata statsMetadata;
+ private final DeserializationHelper deserializationHelper;
+ private final EncodingStats encodingStats;
+ private final SerializationHeader serializationHeader;
+
+ // need to be closed
+ public final SSTableReader ssTableReader;
+ private final RandomAccessReader dataReader;
+ private final DeletionTime.Serializer deletionTimeSerializer;
+ // in serialization order (maybe use inheritance to clamp them together?)
+ public long partitionStart = 0;
+
+ // SHARED STATIC_ROW/ROW/TOMB
+ public int basicElementFlags = 0;
+ public int extendedFlags = 0;
+
+ private final CellCursor staticRowCellCursor = new CellCursor();
+ private final CellCursor rowCellCursor = new CellCursor();
+ public CellCursor cellCursor;
+
+ public SSTableCursorReader(Descriptor desc) throws IOException
+ {
+ metadata = Util.metadataFromSSTable(desc);
+ ssTableReader = SSTableReader.openNoValidation(null, desc,
TableMetadataRef.forOfflineTools(metadata));
+ ssTableReaderRef = ssTableReader.ref();
+ version = desc.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = ssTableReader.getSSTableMetadata();
+ encodingStats = ssTableReader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
desc.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = ssTableReader.header;
+
+ dataReader = ssTableReader.openDataReader();
+ }
+
+ public SSTableCursorReader(SSTableReader reader)
+ {
+ metadata = reader.metadata();
+ ssTableReader = reader;
+ ssTableReaderRef = null;
+ version = reader.descriptor.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = reader.getSSTableMetadata();
+ encodingStats = reader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = reader.header;
+
+ dataReader = reader.openDataReader();
+ }
+
+ @Override
+ public void close()
+ {
+ dataReader.close();
+ if (ssTableReaderRef != null)
+ ssTableReaderRef.close();
+ }
+
+ private void resetOnPartitionStart()
+ {
+ partitionStart = dataReader.getPosition();
+ basicElementFlags = 0;
+ extendedFlags = 0;
+ }
+
+ public int seekPartition(long position) throws IOException
+ {
+ state = SEEK;
+ if (position == 0)
+ {
+ dataReader.seek(position);
+ state = PARTITION_START;
+ }
+ else {
+ // verify partition start is after a partition end marker
+ dataReader.seek(position - 1);
+ if (checkNextFlags() == PARTITION_END)
+ state = PARTITION_START;
+ else
+ throw new IllegalArgumentException("Seeking to a partition at:
" + position + " did not result in a valid state");
+ }
+ resetOnPartitionStart();
+ return state;
+ }
+
+ public int seekPartitionElement(long position) throws IOException
+ {
+ // partition elements have flags
+ dataReader.seek(position);
+ int state = checkNextFlags();
+ if (!isState(state , ROW_START | TOMBSTONE_START | STATIC_ROW_START |
DONE)) throw new IllegalStateException();
+ return state;
+ }
+
+ // struct partition {
+ // struct partition_header header
+ // optional<struct row> row
+ // struct unfiltered unfiltereds[];
+ //};
+ public int readPartitionHeader(PartitionDescriptor header) throws
IOException
+ {
+ if (state != PARTITION_START) throw new IllegalStateException();
+ resetOnPartitionStart();
+ header.load(dataReader, deletionTimeSerializer);
+ return checkNextFlags();
+ }
+
+ // struct static_row {
+ // byte flags; // pre-loaded
+ // byte extended_flags; // pre-loaded
+ // varint row_body_size;
+ // varint prev_unfiltered_size; // for backward traversing, ignored
+ // optional<struct liveness_info> liveness_info;
+ // optional<struct delta_deletion_time> deletion_time;
+ // *** We read the columns in a separate method ***
+ // optional<varint[]> missing_columns;
+ // cell[] cells; // potentially only some
+ //};
+ public int readStaticRowHeader(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != STATIC_ROW_START) throw new IllegalStateException();
+ elementDescriptor.loadStaticRow(dataReader, serializationHeader,
deserializationHelper, basicElementFlags, extendedFlags);
+
+ staticRowCellCursor.init(elementDescriptor.rowColumns(),
elementDescriptor.livenessInfo());
+ cellCursor = staticRowCellCursor;
+ if (!staticRowCellCursor.hasNext())
+ {
+ return checkNextFlags();
+ }
+ else
+ {
+ return state = State.CELL_HEADER_START;
+ }
+ }
+
+ public int copyCellValue(DataOutputPlus writer, byte[] buffer) throws
IOException
+ {
+ if (state != State.CELL_VALUE_START) throw new IllegalStateException();
+ if (cellCursor.cellType == null) throw new IllegalStateException();
+ int length = cellCursor.cellType.valueLengthIfFixed();
+ copyCellContents(writer, buffer, length);
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_END);
+ }
+
+ // TODO: move to cell cursor? maybe avoid copy through buffer?
+ private void copyCellContents(DataOutputPlus writer, byte[]
transferBuffer, int length) throws IOException
+ {
+ if (length >= 0)
+ {
+ dataReader.readFully(transferBuffer, 0, length);
+ writer.write(transferBuffer, 0, length);
+ }
+ else
+ {
+ length = dataReader.readUnsignedVInt32();
+ if (length < 0)
+ throw new IOException("Corrupt (negative) value length
encountered");
+ writer.writeUnsignedVInt32(length);
+ int remaining = length;
+ while (remaining > 0)
+ {
+ int readLength = Math.min(remaining, transferBuffer.length);
+ dataReader.readFully(transferBuffer, 0, readLength);
+ writer.write(transferBuffer, 0, readLength);
+ remaining -= readLength;
+ }
+ }
+ }
+
+ // struct row {
+ // byte flags;
+ // optional<struct clustering_block[]> clustering_blocks;
+ // varint row_body_size;
+ // varint prev_unfiltered_size; // for backward traversing, ignored
+ // optional<struct liveness_info> liveness_info;
+ // optional<struct delta_deletion_time> deletion_time;
+ // *** We read the columns in a separate step ***
+ // optional<varint[]> missing_columns;
+ // cell[] cells; // potentially only some
+ //};
+ public int readRowHeader(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != State.ROW_START) throw new IllegalStateException();
+ if (!UnfilteredSerializer.isRow(basicElementFlags)) throw new
IllegalStateException();
+ elementDescriptor.loadRow(dataReader, serializationHeader,
deserializationHelper, clusteringColumnTypes, basicElementFlags);
+
+ rowCellCursor.init(elementDescriptor.rowColumns(),
elementDescriptor.livenessInfo());
+ cellCursor = rowCellCursor;
+ if (!rowCellCursor.hasNext())
+ {
+ return checkNextFlags();
+ }
+ else
+ {
+ return state = State.CELL_HEADER_START;
+ }
+ }
+
+ // TODO: introduce cell header class
+ public int readCellHeader() throws IOException
+ {
+ if (state != State.CELL_HEADER_START) throw new
IllegalStateException();
+ if (cellCursor.readCellHeader())
+ {
+ return state = State.CELL_VALUE_START;
+ }
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_END);
+ }
+
+ @Inline
+ public int skipCellValue() throws IOException
+ {
+ if (state != State.CELL_VALUE_START) throw new IllegalStateException();
+ cellCursor.cellType.skipValue(dataReader);
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_HEADER_START);
+ }
+
+ /**
+ * See: {@link
org.apache.cassandra.db.rows.UnfilteredSerializer#serialize(RangeTombstoneMarker,
SerializationHelper, DataOutputPlus, long, int)}
+ * <pre>
+ * struct range_tombstone_marker {
+ * byte flags = IS_MARKER;
+ * byte kind_ordinal;
+ * be16 bound_values_count;
+ * struct clustering_block[] clustering_blocks;
+ * varint marker_body_size;
+ * varint prev_unfiltered_size;
+ * };
+ * struct range_tombstone_bound_marker : range_tombstone_marker {
+ * struct delta_deletion_time deletion_time;
+ * };
+ * struct range_tombstone_boundary_marker : range_tombstone_marker {
+ * struct delta_deletion_time end_deletion_time;
+ * struct delta_deletion_time start_deletion_time;
+ * };
+ * </pre>
+ *
+ /// TODO: tombstone as resizable buffer
+ */
+ public int readTombstoneMarker(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != TOMBSTONE_START) throw new IllegalStateException();
+ if (!UnfilteredSerializer.isTombstoneMarker(basicElementFlags)) throw
new IllegalStateException();
+ elementDescriptor.loadTombstone(dataReader, serializationHeader,
deserializationHelper, clusteringColumnTypes, basicElementFlags);
+ // unfilteredStart = dataReader.getPosition() - 1;
+
+ return checkNextFlags();
+ }
+//
+// /**
+// * TODO: deduplicate for tombstones
+// * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize
+// */
+// private void readUnfilteredClustering(AbstractType<?>[] types, int
clusteringColumnsBound) throws IOException
+// {
+// if (clusteringColumnsBound == 0) {
+// clusteringLength = 0;
+// return;
+// }
+// long clusteringStartPosition = dataReader.getPosition();
+// skipClustering(dataReader, types, clusteringColumnsBound);
+// long clusteringLengthLong = dataReader.getPosition() -
clusteringStartPosition;
+// // Notionally, max clustering size is 2G, with each column limit at
64k,
+// if (clusteringLengthLong > Integer.MAX_VALUE) {
+// throw new IllegalStateException();
+// }
+// clusteringLength = (int) clusteringLengthLong;
+// if (clusteringLength > clustering.length) {
+// clustering = new byte[Pow2.roundToPowerOfTwo(clusteringLength)];
+// clusteringBuffer = ByteBuffer.wrap(clustering); // would be nice
if it was re-usable
+// }
+// dataReader.seek(clusteringStartPosition);
+// dataReader.readFully(clustering, 0, clusteringLength);
+// clusteringBuffer.limit(clusteringLength);
+// }
+
+ /**
+ * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize}
+ */
+ static void readUnfilteredClustering(RandomAccessReader dataReader,
AbstractType<?>[] types, int clusteringColumnsBound, ResizableByteBuffer
clustering) throws IOException
+ {
+ if (clusteringColumnsBound == 0) {
+ clustering.resetBuffer();
+ return;
+ }
+ long clusteringStartPosition = dataReader.getPosition();
+ skipClustering(dataReader, types, clusteringColumnsBound);
+ long clusteringLengthLong = dataReader.getPosition() -
clusteringStartPosition;
+
+ // Notionally, max clustering size is 2G, with each column limit at
64k,
+ if (clusteringLengthLong > Integer.MAX_VALUE) {
+ throw new IllegalStateException();
+ }
+ dataReader.seek(clusteringStartPosition);
Review Comment:
This will involve fetching a buffer from chunk cache if a chunk boundary is
crossed. The cost is not trivial, and it screws with the cache's frequency
metrics. I'd prefer to do the loading as part of the parsing.
##########
src/java/org/apache/cassandra/io/util/ReusableLongToken.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.jctools.util.UnsafeAccess;
+
+public class ReusableLongToken extends Murmur3Partitioner.LongToken
Review Comment:
Nit: This shouldn't need to be public.
##########
src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.*;
+
+public class SSTableCursorReader implements AutoCloseable
+{
+ public static final ColumnMetadata[] COLUMN_METADATA_TYPE = new
ColumnMetadata[0];
+ private final Ref<SSTableReader> ssTableReaderRef;
+
+ public interface State
+ {
+ /** start of file, after partition end but before EOF */
+ int PARTITION_START = 1;
+ int STATIC_ROW_START = 1 << 1;
+ int ROW_START = 1 << 2;
+ /** common to row/static row cells */
+ int CELL_HEADER_START = 1 << 3;
+ int CELL_VALUE_START = 1 << 4;
+ int CELL_END = 1 << 5;
+ int TOMBSTONE_START = 1 << 6;
+ int AFTER_TOMBSTONE_HEADER = 1 << 7;
+ /** common to rows/tombstones. Call continue(); for next element, or
maybe partition end */
+ int ELEMENT_END = 1 << 8;
+ /** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
+ int PARTITION_END = 1 << 9;
+ /** EOF */
+ int DONE = 1 << 10;
+ int SEEK = 1 << 11;
+ static boolean isState(int state, int mask) {
+ return (state & mask) != 0;
+ }
+ }
+
+ public class CellCursor {
+ public ReusableLivenessInfo rowLiveness;
+ public Columns columns;
+
+ public int columnsSize;
+ public int columnsIndex;
+ public int cellFlags;
+ public final ReusableLivenessInfo cellLiveness = new
ReusableLivenessInfo();
+ public CellPath cellPath;
+ public AbstractType<?> cellType;
+ public ColumnMetadata cellColumn;
+ private ColumnMetadata[] columnsArray;
+ private AbstractType<?>[] cellTypeArray;
+
+ void init (Columns columns, ReusableLivenessInfo rowLiveness)
+ {
+ if (this.columns != columns)
+ {
+ // This will be a problem with changing columns
+ this.columns = columns;
+ columnsArray = columns.toArray(COLUMN_METADATA_TYPE);
+ cellTypeArray = new AbstractType<?>[columnsArray.length];
+ for (int i = 0; i < columnsArray.length; i++)
+ {
+ ColumnMetadata cellColumn = columnsArray[i];
+ cellTypeArray[i] =
serializationHeader.getType(cellColumn);
+ }
+ // HOTSPOT: size is suprisingly expensive
+ columnsSize = columns.size();
+ }
+ this.rowLiveness = rowLiveness;
+ columnsIndex = 0;
+ cellFlags = 0;
+ cellPath = null;
+ cellType = null;
+ }
+
+ public boolean hasNext()
+ {
+ return columnsIndex < columnsSize;
+ }
+
+ /**
+ * For Cell deserialization see {@link Cell.Serializer#deserialize}
+ *
+ * @return true if has value, false otherwise
+ */
+ boolean readCellHeader() throws IOException
+ {
+ if (!(columnsIndex < columnsSize)) throw new
IllegalStateException();
+
+ // HOTSPOT: suprisingly expensive
+ int currIndex = columnsIndex++;
+ cellColumn = columnsArray[currIndex];
+ cellType = cellTypeArray[currIndex];
+ cellFlags = dataReader.readUnsignedByte();
+ // TODO: specialize common case where flags == HAS_VALUE |
USE_ROW_TS?
+ boolean hasValue = Cell.Serializer.hasValue(cellFlags);
+ boolean isDeleted = Cell.Serializer.isDeleted(cellFlags);
+ boolean isExpiring = Cell.Serializer.isExpiring(cellFlags);
+ boolean useRowTimestamp =
Cell.Serializer.useRowTimestamp(cellFlags);
+ boolean useRowTTL = Cell.Serializer.useRowTTL(cellFlags);
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() :
serializationHeader.readTimestamp(dataReader);
+
+ long localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ?
serializationHeader.readLocalDeletionTime(dataReader) : Cell.NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ?
serializationHeader.readTTL(dataReader) : Cell.NO_TTL);
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, deserializationHelper);
+
+ cellLiveness.reset(timestamp, ttl, localDeletionTime);
+ cellPath = cellColumn.isComplex()
+ ?
cellColumn.cellPathSerializer().deserialize(dataReader)
+ : null;
+ return hasValue;
+ }
+ }
+
+ int state = PARTITION_START;
+
+ private final Version version;
+ private final TableMetadata metadata;
+ private final ImmutableList<ColumnMetadata> clusteringColumns;
+ public final AbstractType<?>[] clusteringColumnTypes;
+ private final StatsMetadata statsMetadata;
+ private final DeserializationHelper deserializationHelper;
+ private final EncodingStats encodingStats;
+ private final SerializationHeader serializationHeader;
+
+ // need to be closed
+ public final SSTableReader ssTableReader;
+ private final RandomAccessReader dataReader;
+ private final DeletionTime.Serializer deletionTimeSerializer;
+ // in serialization order (maybe use inheritance to clamp them together?)
+ public long partitionStart = 0;
+
+ // SHARED STATIC_ROW/ROW/TOMB
+ public int basicElementFlags = 0;
+ public int extendedFlags = 0;
+
+ private final CellCursor staticRowCellCursor = new CellCursor();
+ private final CellCursor rowCellCursor = new CellCursor();
+ public CellCursor cellCursor;
+
+ public SSTableCursorReader(Descriptor desc) throws IOException
+ {
+ metadata = Util.metadataFromSSTable(desc);
+ ssTableReader = SSTableReader.openNoValidation(null, desc,
TableMetadataRef.forOfflineTools(metadata));
+ ssTableReaderRef = ssTableReader.ref();
+ version = desc.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = ssTableReader.getSSTableMetadata();
+ encodingStats = ssTableReader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
desc.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = ssTableReader.header;
+
+ dataReader = ssTableReader.openDataReader();
+ }
+
+ public SSTableCursorReader(SSTableReader reader)
+ {
+ metadata = reader.metadata();
+ ssTableReader = reader;
+ ssTableReaderRef = null;
+ version = reader.descriptor.version;
+ deletionTimeSerializer = DeletionTime.getSerializer(version);
+ clusteringColumns = metadata.clusteringColumns();
+ int clusteringColumnCount = clusteringColumns.size();
+ clusteringColumnTypes = new AbstractType<?>[clusteringColumnCount];
+ for (int i = 0; i < clusteringColumnTypes.length; i++)
+ {
+ clusteringColumnTypes[i] = clusteringColumns.get(i).type;
+ }
+ statsMetadata = reader.getSSTableMetadata();
+ encodingStats = reader.stats();
+ deserializationHelper = new DeserializationHelper(metadata,
version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
+ serializationHeader = reader.header;
+
+ dataReader = reader.openDataReader();
+ }
+
+ @Override
+ public void close()
+ {
+ dataReader.close();
+ if (ssTableReaderRef != null)
+ ssTableReaderRef.close();
+ }
+
+ private void resetOnPartitionStart()
+ {
+ partitionStart = dataReader.getPosition();
+ basicElementFlags = 0;
+ extendedFlags = 0;
+ }
+
+ public int seekPartition(long position) throws IOException
+ {
+ state = SEEK;
+ if (position == 0)
+ {
+ dataReader.seek(position);
+ state = PARTITION_START;
+ }
+ else {
+ // verify partition start is after a partition end marker
+ dataReader.seek(position - 1);
+ if (checkNextFlags() == PARTITION_END)
+ state = PARTITION_START;
+ else
+ throw new IllegalArgumentException("Seeking to a partition at:
" + position + " did not result in a valid state");
+ }
+ resetOnPartitionStart();
+ return state;
+ }
+
+ public int seekPartitionElement(long position) throws IOException
+ {
+ // partition elements have flags
+ dataReader.seek(position);
+ int state = checkNextFlags();
+ if (!isState(state , ROW_START | TOMBSTONE_START | STATIC_ROW_START |
DONE)) throw new IllegalStateException();
+ return state;
+ }
+
+ // struct partition {
+ // struct partition_header header
+ // optional<struct row> row
+ // struct unfiltered unfiltereds[];
+ //};
+ public int readPartitionHeader(PartitionDescriptor header) throws
IOException
+ {
+ if (state != PARTITION_START) throw new IllegalStateException();
+ resetOnPartitionStart();
+ header.load(dataReader, deletionTimeSerializer);
+ return checkNextFlags();
+ }
+
+ // struct static_row {
+ // byte flags; // pre-loaded
+ // byte extended_flags; // pre-loaded
+ // varint row_body_size;
+ // varint prev_unfiltered_size; // for backward traversing, ignored
+ // optional<struct liveness_info> liveness_info;
+ // optional<struct delta_deletion_time> deletion_time;
+ // *** We read the columns in a separate method ***
+ // optional<varint[]> missing_columns;
+ // cell[] cells; // potentially only some
+ //};
+ public int readStaticRowHeader(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != STATIC_ROW_START) throw new IllegalStateException();
+ elementDescriptor.loadStaticRow(dataReader, serializationHeader,
deserializationHelper, basicElementFlags, extendedFlags);
+
+ staticRowCellCursor.init(elementDescriptor.rowColumns(),
elementDescriptor.livenessInfo());
+ cellCursor = staticRowCellCursor;
+ if (!staticRowCellCursor.hasNext())
+ {
+ return checkNextFlags();
+ }
+ else
+ {
+ return state = State.CELL_HEADER_START;
+ }
+ }
+
+ public int copyCellValue(DataOutputPlus writer, byte[] buffer) throws
IOException
+ {
+ if (state != State.CELL_VALUE_START) throw new IllegalStateException();
+ if (cellCursor.cellType == null) throw new IllegalStateException();
+ int length = cellCursor.cellType.valueLengthIfFixed();
+ copyCellContents(writer, buffer, length);
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_END);
+ }
+
+ // TODO: move to cell cursor? maybe avoid copy through buffer?
+ private void copyCellContents(DataOutputPlus writer, byte[]
transferBuffer, int length) throws IOException
+ {
+ if (length >= 0)
+ {
+ dataReader.readFully(transferBuffer, 0, length);
+ writer.write(transferBuffer, 0, length);
+ }
+ else
+ {
+ length = dataReader.readUnsignedVInt32();
+ if (length < 0)
+ throw new IOException("Corrupt (negative) value length
encountered");
+ writer.writeUnsignedVInt32(length);
+ int remaining = length;
+ while (remaining > 0)
+ {
+ int readLength = Math.min(remaining, transferBuffer.length);
+ dataReader.readFully(transferBuffer, 0, readLength);
+ writer.write(transferBuffer, 0, readLength);
+ remaining -= readLength;
+ }
+ }
+ }
+
+ // struct row {
+ // byte flags;
+ // optional<struct clustering_block[]> clustering_blocks;
+ // varint row_body_size;
+ // varint prev_unfiltered_size; // for backward traversing, ignored
+ // optional<struct liveness_info> liveness_info;
+ // optional<struct delta_deletion_time> deletion_time;
+ // *** We read the columns in a separate step ***
+ // optional<varint[]> missing_columns;
+ // cell[] cells; // potentially only some
+ //};
+ public int readRowHeader(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != State.ROW_START) throw new IllegalStateException();
+ if (!UnfilteredSerializer.isRow(basicElementFlags)) throw new
IllegalStateException();
+ elementDescriptor.loadRow(dataReader, serializationHeader,
deserializationHelper, clusteringColumnTypes, basicElementFlags);
+
+ rowCellCursor.init(elementDescriptor.rowColumns(),
elementDescriptor.livenessInfo());
+ cellCursor = rowCellCursor;
+ if (!rowCellCursor.hasNext())
+ {
+ return checkNextFlags();
+ }
+ else
+ {
+ return state = State.CELL_HEADER_START;
+ }
+ }
+
+ // TODO: introduce cell header class
+ public int readCellHeader() throws IOException
+ {
+ if (state != State.CELL_HEADER_START) throw new
IllegalStateException();
+ if (cellCursor.readCellHeader())
+ {
+ return state = State.CELL_VALUE_START;
+ }
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_END);
+ }
+
+ @Inline
+ public int skipCellValue() throws IOException
+ {
+ if (state != State.CELL_VALUE_START) throw new IllegalStateException();
+ cellCursor.cellType.skipValue(dataReader);
+ return !cellCursor.hasNext() ? checkNextFlags() : (state =
State.CELL_HEADER_START);
+ }
+
+ /**
+ * See: {@link
org.apache.cassandra.db.rows.UnfilteredSerializer#serialize(RangeTombstoneMarker,
SerializationHelper, DataOutputPlus, long, int)}
+ * <pre>
+ * struct range_tombstone_marker {
+ * byte flags = IS_MARKER;
+ * byte kind_ordinal;
+ * be16 bound_values_count;
+ * struct clustering_block[] clustering_blocks;
+ * varint marker_body_size;
+ * varint prev_unfiltered_size;
+ * };
+ * struct range_tombstone_bound_marker : range_tombstone_marker {
+ * struct delta_deletion_time deletion_time;
+ * };
+ * struct range_tombstone_boundary_marker : range_tombstone_marker {
+ * struct delta_deletion_time end_deletion_time;
+ * struct delta_deletion_time start_deletion_time;
+ * };
+ * </pre>
+ *
+ /// TODO: tombstone as resizable buffer
+ */
+ public int readTombstoneMarker(ElementDescriptor elementDescriptor) throws
IOException
+ {
+ if (state != TOMBSTONE_START) throw new IllegalStateException();
+ if (!UnfilteredSerializer.isTombstoneMarker(basicElementFlags)) throw
new IllegalStateException();
+ elementDescriptor.loadTombstone(dataReader, serializationHeader,
deserializationHelper, clusteringColumnTypes, basicElementFlags);
+ // unfilteredStart = dataReader.getPosition() - 1;
+
+ return checkNextFlags();
+ }
+//
+// /**
+// * TODO: deduplicate for tombstones
+// * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize
+// */
+// private void readUnfilteredClustering(AbstractType<?>[] types, int
clusteringColumnsBound) throws IOException
+// {
+// if (clusteringColumnsBound == 0) {
+// clusteringLength = 0;
+// return;
+// }
+// long clusteringStartPosition = dataReader.getPosition();
+// skipClustering(dataReader, types, clusteringColumnsBound);
+// long clusteringLengthLong = dataReader.getPosition() -
clusteringStartPosition;
+// // Notionally, max clustering size is 2G, with each column limit at
64k,
+// if (clusteringLengthLong > Integer.MAX_VALUE) {
+// throw new IllegalStateException();
+// }
+// clusteringLength = (int) clusteringLengthLong;
+// if (clusteringLength > clustering.length) {
+// clustering = new byte[Pow2.roundToPowerOfTwo(clusteringLength)];
+// clusteringBuffer = ByteBuffer.wrap(clustering); // would be nice
if it was re-usable
+// }
+// dataReader.seek(clusteringStartPosition);
+// dataReader.readFully(clustering, 0, clusteringLength);
+// clusteringBuffer.limit(clusteringLength);
+// }
+
+ /**
+ * {@link ClusteringPrefix.Serializer#deserializeValuesWithoutSize}
+ */
+ static void readUnfilteredClustering(RandomAccessReader dataReader,
AbstractType<?>[] types, int clusteringColumnsBound, ResizableByteBuffer
clustering) throws IOException
+ {
+ if (clusteringColumnsBound == 0) {
+ clustering.resetBuffer();
+ return;
+ }
+ long clusteringStartPosition = dataReader.getPosition();
+ skipClustering(dataReader, types, clusteringColumnsBound);
+ long clusteringLengthLong = dataReader.getPosition() -
clusteringStartPosition;
+
+ // Notionally, max clustering size is 2G, with each column limit at
64k,
+ if (clusteringLengthLong > Integer.MAX_VALUE) {
+ throw new IllegalStateException();
+ }
+ dataReader.seek(clusteringStartPosition);
+ clustering.load(dataReader, (int) clusteringLengthLong);
+ }
+
+ private static void skipClustering(RandomAccessReader dataReader,
AbstractType<?>[] types, int clusteringColumnsBound) throws IOException
+ {
+ long clusteringBlockHeader = 0;
+ for (int clusteringIndex = 0; clusteringIndex <
clusteringColumnsBound; clusteringIndex++)
+ {
+ // struct clustering_block {
+ // varint clustering_block_header;
+ // simple_cell[] clustering_cells;
+ // };
+ if (clusteringIndex % 32 == 0)
+ {
+ clusteringBlockHeader = dataReader.readUnsignedVInt();
+ }
+ clusteringBlockHeader = clusteringBlockHeader >>> 2;
+ // struct clustering_block {
+ // varint clustering_block_header;
+ // simple_cell[] clustering_cells;
+ // };
+ if ((clusteringBlockHeader & 0x11) == 0)
+ {
+ AbstractType<?> type = types[clusteringIndex];
+ int len = type.isValueLengthFixed() ?
type.valueLengthIfFixed() : dataReader.readUnsignedVInt32();
+ dataReader.skipBytes(len);
+ }
+ }
+ }
+
+ /**
+ * {@link UnfilteredSerializer#deserializeRowBody(DataInputPlus,
SerializationHeader, DeserializationHelper, int, int, Row.Builder)}
+ */
+ static void readLivenessInfo(RandomAccessReader dataReader,
SerializationHeader serializationHeader, DeserializationHelper
deserializationHelper, int flags, ReusableLivenessInfo livenessInfo) throws
IOException
+ {
+ long timestamp = LivenessInfo.NO_TIMESTAMP;
+ int ttl = LivenessInfo.NO_TTL;
+ long localExpirationTime = LivenessInfo.NO_EXPIRATION_TIME;
+ if (UnfilteredSerializer.hasTimestamp(flags))
+ {
+ // struct liveness_info {
+ // varint64 delta_timestamp;
+ // optional<varint32> delta_ttl;
+ // optional<varint64> delta_local_deletion_time;
+ //};
+ timestamp = serializationHeader.readTimestamp(dataReader);
+ if (UnfilteredSerializer.hasTTL(flags))
+ {
+ ttl = serializationHeader.readTTL(dataReader);
+ localExpirationTime =
Cell.decodeLocalDeletionTime(serializationHeader.readLocalDeletionTime(dataReader),
ttl, deserializationHelper);
+ }
+ }
+ livenessInfo.reset(timestamp, ttl, localExpirationTime);
+ }
+
+ // SKIPPING
+ public int skipPartition() throws IOException
+ {
+ if (state == PARTITION_END)
+ return continueReading();
+
+ if (state == PARTITION_START)
+ {
+ int partitionKeyLength = dataReader.readUnsignedShort();
+ dataReader.skipBytes(partitionKeyLength);
+
+ // PARTITION DELETION TIME
+ deletionTimeSerializer.skip(dataReader);
+ checkNextFlags(true, state);
+ }
+ else if (!isState(state, STATIC_ROW_START | ROW_START |
TOMBSTONE_START | PARTITION_END))
+ {
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+
+ while (!isState(state,PARTITION_START | DONE))
+ {
+ switch (state)
+ {
+ case STATIC_ROW_START:
+ state = skipStaticRow();
+ break;
+ case ROW_START:
+ case TOMBSTONE_START:
+ state = skipUnfiltered();
+ break;
+ }
+ }
+ return state;
+ }
+
+ public int skipStaticRow() throws IOException
+ {
+ if (state != State.STATIC_ROW_START) throw new IllegalStateException();
+
+ long rowSize = dataReader.readUnsignedVInt();
+ dataReader.skipBytes(rowSize);
+ return checkNextFlags(true, state);
+ }
+
+ public int skipStaticRowColumns(ElementDescriptor elementDescriptor)
throws IOException
+ {
+ if (!(UnfilteredSerializer.isStatic(elementDescriptor.extendedFlags())
&&
+ isState(state,CELL_HEADER_START | CELL_VALUE_START | CELL_END)))
throw new IllegalStateException();
+
+ dataReader.seek(elementDescriptor.dataStart() +
elementDescriptor.size());
+ return checkNextFlags(true, state);
+ }
+
+ public int skipUnfiltered() throws IOException
+ {
+ if (!isState(state, ROW_START | TOMBSTONE_START)) throw new
IllegalStateException();
+
+ AbstractType<?>[] types = clusteringColumnTypes;
+ int clusteringColumnsBound = types.length;
+ // tombstone markers have `kind` & `clusteringColumnsBound`
+ if (!UnfilteredSerializer.isRow(basicElementFlags))
+ {
+ byte kind = dataReader.readByte();
+ clusteringColumnsBound = dataReader.readUnsignedShort();
+ }
+ /**
+ * {@link org.apache.cassandra.db.ClusteringPrefix.Deserializer}
+ */
+ skipClustering(dataReader, types, clusteringColumnsBound);
+ // same for row/tombstone
+ long rowSize = dataReader.readUnsignedVInt();
+ dataReader.skipBytes(rowSize);
+
+ return checkNextFlags(true, state);
+ }
+
+ public int skipRowCells(long unfilteredDataStart, long unfilteredSize)
throws IOException
+ {
+ if (!(isState(state,CELL_HEADER_START | CELL_VALUE_START | CELL_END)))
throw new IllegalStateException();
+
+ dataReader.seek(unfilteredDataStart + unfilteredSize);
+ return checkNextFlags(true, state);
+ }
+
+ @Inline
+ public int continueReading() {
+ switch (state)
+ {
+ case PARTITION_END:
+ state = dataReader.isEOF() ? DONE : PARTITION_START;
+ break;
+ case ELEMENT_END:
+ if (UnfilteredSerializer.isEndOfPartition(basicElementFlags))
+ {
+ state = PARTITION_END;
+ }
+ else
+ {
+ state = UnfilteredSerializer.isRow(basicElementFlags) ?
ROW_START : TOMBSTONE_START;
+ }
+ break;
+ case CELL_END:
+ if (cellCursor.hasNext())
+ {
+ state = CELL_HEADER_START;
+ }
+ else
+ {
+ state = ELEMENT_END;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Cannot continue reading in
current state: " + state);
+ }
+ return state;
+ }
+
+ private int checkNextFlags() throws IOException
Review Comment:
The caller of this method appears to be pretty well aware what kind of
flags/state it expects this to be called in. Would it make sense to split it
into `checkNext(Partition|Unfiltered|Cell)Flags`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]