SammyVimes commented on code in PR #1076:
URL: https://github.com/apache/ignite-3/pull/1076#discussion_r975031167
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java:
##########
@@ -58,9 +64,35 @@ static ColumnFamilyType fromCfName(String cfName) {
return PARTITION;
} else if (HASH_INDEX_CF_NAME.equals(cfName)) {
return HASH_INDEX;
+ } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
+ return SORTED_INDEX;
Review Comment:
So we have a column family for every index? Why not one cf for all indexes?
##########
modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java:
##########
@@ -114,7 +115,7 @@ public boolean hasNullMap() {
* Returns the content of this tuple as a byte buffer.
*/
public ByteBuffer byteBuffer() {
- return buffer.slice();
+ return buffer.slice().order(ByteOrder.LITTLE_ENDIAN);
Review Comment:
Probably byte order should be a constant
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java:
##########
@@ -15,80 +15,66 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.index.impl;
+package org.apache.ignite.internal.storage.index;
+import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.row.InternalTuple;
import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
/**
- * Comparator implementation for comparting {@link BinaryTuple}s on a
per-column basis.
+ * Comparator implementation for comparing {@link BinaryTuple}s on a
per-column basis.
*/
-class BinaryTupleComparator implements Comparator<BinaryTuple> {
+public class BinaryTupleComparator implements Comparator<ByteBuffer> {
private final SortedIndexDescriptor descriptor;
- private final int prefixLength;
-
- private BinaryTupleComparator(SortedIndexDescriptor descriptor, int
prefixLength) {
- if (prefixLength > descriptor.indexColumns().size()) {
- throw new IllegalArgumentException("Invalid prefix length: " +
prefixLength);
- }
-
- this.descriptor = descriptor;
- this.prefixLength = prefixLength;
- }
-
/**
* Creates a comparator for a Sorted Index identified by the given
descriptor.
*/
- static BinaryTupleComparator newComparator(SortedIndexDescriptor
descriptor) {
- return new BinaryTupleComparator(descriptor,
descriptor.indexColumns().size());
- }
-
- /**
- * Similar to {@link #newComparator} but creates a comparator that only
compares first {@code prefixLength} index columns.
- */
- static BinaryTupleComparator newPrefixComparator(SortedIndexDescriptor
descriptor, int prefixLength) {
- return new BinaryTupleComparator(descriptor, prefixLength);
+ public BinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this.descriptor = descriptor;
}
@Override
- public int compare(BinaryTuple tuple1, BinaryTuple tuple2) {
- return compare(tuple1, tuple2, 1, 0);
- }
+ public int compare(ByteBuffer buffer1, ByteBuffer buffer2) {
+ assert buffer1.order() == ByteOrder.LITTLE_ENDIAN;
+ assert buffer2.order() == ByteOrder.LITTLE_ENDIAN;
+ assert !(isPrefix(buffer1) && isPrefix(buffer2));
- /**
- * Compares a given tuple with the configured prefix.
- *
- * @param tuple1 Tuple to compare.
- * @param tuple2 Tuple to compare.
- * @param direction Sort direction: {@code -1} means sorting in reversed
order, {@code 1} means sorting in the natural order.
- * @param equals Value that should be returned if the provided tuple
exactly matches the prefix.
- * @return the value {@code 0} if the given row starts with the configured
prefix;
- * a value less than {@code 0} if the row's prefix is smaller than
the prefix; and
- * a value greater than {@code 0} if the row's prefix is larger
than the prefix.
- */
- public int compare(BinaryTuple tuple1, BinaryTuple tuple2, int direction,
int equals) {
- for (int i = 0; i < prefixLength; i++) {
+ BinaryTupleSchema schema = descriptor.binaryTupleSchema();
+
+ InternalTuple tuple1 = isPrefix(buffer1) ? new
BinaryTuplePrefix(schema, buffer1) : new BinaryTuple(schema, buffer1);
Review Comment:
I think `isPrefix` can be extracted into a variable (you already called this
method in the assert statement)
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -378,7 +394,28 @@ public CompletableFuture<Void> destroyPartition(int
partitionId) throws StorageE
/** {@inheritDoc} */
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID
indexId) {
- throw new UnsupportedOperationException("Not implemented yet");
+ SortedIndices storages = sortedIndices.computeIfAbsent(indexId, id -> {
+ var indexDescriptor = new SortedIndexDescriptor(indexId,
tableCfg.value());
Review Comment:
I think this lambda deserves its own method
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for working with cursors.
+ */
+class CursorUtils {
+ /**
+ * Cursor wrapper that discards elements while they match a given
predicate. As soon as any element does not match the predicate,
+ * no more elements will be discarded.
+ *
+ * @param <T> Cursor element type.
+ */
+ private static class DropWhileCursor<T> implements Cursor<T> {
+ private final Cursor<T> cursor;
+
+ @Nullable
+ private Predicate<T> predicate;
+
+ @Nullable
+ private T skippedElement;
+
+ DropWhileCursor(Cursor<T> cursor, Predicate<T> predicate) {
+ this.cursor = cursor;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public void close() throws Exception {
+ cursor.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (predicate == null) {
+ return skippedElement != null || cursor.hasNext();
Review Comment:
So it's not skipped. Probably "lastElement"?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * {@link AbstractComparator} implementation that compares Binary Tuples.
+ */
+public class RocksDbBinaryTupleComparator extends AbstractComparator {
+ private final BinaryTupleComparator comparator;
+
+ /** Options needed for resource management. */
+ private final ComparatorOptions options;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Sorted Index descriptor.
+ */
+ public RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this(descriptor, new ComparatorOptions());
+ }
+
+ private RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor,
ComparatorOptions options) {
+ super(options);
+
+ this.options = options;
+ this.comparator = new BinaryTupleComparator(descriptor);
+ }
+
+ @Override
+ public String name() {
+ return getClass().getCanonicalName();
+ }
+
+ @Override
+ public int compare(ByteBuffer a, ByteBuffer b) {
+ int comparePartitionId = Short.compare(a.getShort(), b.getShort());
+
+ if (comparePartitionId != 0) {
+ return comparePartitionId;
+ }
+
+ ByteBuffer firstBinaryTupleBuffer =
a.slice().order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer secondBinaryTupleBuffer =
b.slice().order(ByteOrder.LITTLE_ENDIAN);
+
+ int compareTuples = comparator.compare(firstBinaryTupleBuffer,
secondBinaryTupleBuffer);
+
+ // Binary Tuple Prefixes don't have row IDs, so they can't be compared.
+ if (compareTuples != 0 || isPrefix(firstBinaryTupleBuffer) ||
isPrefix(secondBinaryTupleBuffer)) {
+ return compareTuples;
+ }
+
+ return compareRowIds(a, b);
+ }
+
+ private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
+ long firstMostSignBits = a.getLong(a.remaining() - Long.BYTES * 2);
Review Comment:
probably should be `a.limit()`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * {@link AbstractComparator} implementation that compares Binary Tuples.
+ */
+public class RocksDbBinaryTupleComparator extends AbstractComparator {
+ private final BinaryTupleComparator comparator;
+
+ /** Options needed for resource management. */
+ private final ComparatorOptions options;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Sorted Index descriptor.
+ */
+ public RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this(descriptor, new ComparatorOptions());
+ }
+
+ private RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor,
ComparatorOptions options) {
+ super(options);
+
+ this.options = options;
+ this.comparator = new BinaryTupleComparator(descriptor);
+ }
+
+ @Override
+ public String name() {
+ return getClass().getCanonicalName();
+ }
+
+ @Override
+ public int compare(ByteBuffer a, ByteBuffer b) {
+ int comparePartitionId = Short.compare(a.getShort(), b.getShort());
+
+ if (comparePartitionId != 0) {
+ return comparePartitionId;
+ }
+
+ ByteBuffer firstBinaryTupleBuffer =
a.slice().order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer secondBinaryTupleBuffer =
b.slice().order(ByteOrder.LITTLE_ENDIAN);
+
+ int compareTuples = comparator.compare(firstBinaryTupleBuffer,
secondBinaryTupleBuffer);
+
+ // Binary Tuple Prefixes don't have row IDs, so they can't be compared.
+ if (compareTuples != 0 || isPrefix(firstBinaryTupleBuffer) ||
isPrefix(secondBinaryTupleBuffer)) {
+ return compareTuples;
+ }
+
+ return compareRowIds(a, b);
+ }
+
+ private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
+ long firstMostSignBits = a.getLong(a.remaining() - Long.BYTES * 2);
+ long secondMostSignBits = b.getLong(b.remaining() - Long.BYTES * 2);
+
+ int compare = Long.compare(firstMostSignBits, secondMostSignBits);
+
+ if (compare != 0) {
+ return compare;
+ }
+
+ long firstLeastSignBits = a.getLong(a.remaining() - Long.BYTES);
+ long secondLeastSignBits = b.getLong(b.remaining() - Long.BYTES);
Review Comment:
and here
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndices.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.rocksdb.RocksDBException;
+
+class SortedIndices {
Review Comment:
I think javadoc would be nice here.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static
org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.concat;
+import static
org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.dropWhile;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.map;
+import static
org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.takeWhile;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * {@link SortedIndexStorage} implementation based on RocksDB.
+ *
+ * <p>This storage uses the following format for keys:
+ * <pre>
+ * Partition ID - 2 bytes
+ * Tuple value - variable length
+ * Row ID (UUID) - 16 bytes
+ * </pre>
+ *
+ * <p>We use an empty array as values, because all required information can be
extracted from the key.
+ */
+public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+ private final SortedIndexDescriptor descriptor;
+
+ private final ColumnFamily indexCf;
+
+ private final RocksDbMvPartitionStorage partitionStorage;
+
+ /**
+ * Creates a storage.
+ *
+ * @param descriptor Sorted Index descriptor.
+ * @param indexCf Column family that stores the index data.
+ * @param partitionStorage Partition storage of the corresponding index.
+ */
+ public RocksDbSortedIndexStorage(
+ SortedIndexDescriptor descriptor,
+ ColumnFamily indexCf,
+ RocksDbMvPartitionStorage partitionStorage
+ ) {
+ this.descriptor = descriptor;
+ this.indexCf = indexCf;
+ this.partitionStorage = partitionStorage;
+ }
+
+ @Override
+ public SortedIndexDescriptor indexDescriptor() {
+ return descriptor;
+ }
+
+ @Override
+ public void put(IndexRow row) {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+ try {
+ writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to insert data into sorted
index. Index ID: " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public void remove(IndexRow row) {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+ try {
+ writeBatch.delete(indexCf.handle(), rocksKey(row));
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to remove data from sorted
index. Index ID: " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ return scan(lowerBound, upperBound, includeLower, includeUpper);
+ }
+
+ private Cursor<IndexRow> scan(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ boolean includeLower,
+ boolean includeUpper
+ ) {
+ byte[] lowerBoundBytes = lowerBound == null ? null :
rocksPrefix(lowerBound);
+ byte[] upperBoundBytes = upperBound == null ? null :
rocksPrefix(upperBound);
+
+ Cursor<ByteBuffer> cursor = createScanCursor(lowerBoundBytes,
upperBoundBytes);
+
+ // Skip the lower bound, if needed (RocksDB includes the lower bound
by default).
+ if (!includeLower && lowerBound != null) {
+ cursor = dropWhile(cursor, startsWith(lowerBound));
+ }
+
+ // Include the upper bound, if needed (RocksDB excludes the upper
bound by default).
+ if (includeUpper && upperBound != null) {
+ Cursor<ByteBuffer> upperBoundCursor =
takeWhile(createScanCursor(upperBoundBytes, null), startsWith(upperBound));
+
+ cursor = concat(cursor, upperBoundCursor);
+ }
+
+ return map(cursor, this::decodeRow);
+ }
+
+ private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound,
byte @Nullable [] upperBound) {
+ Slice upperBoundSlice = upperBound == null ? null : new
Slice(upperBound);
+
+ ReadOptions options = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
+
+ RocksIterator it = indexCf.newIterator(options);
+
+ if (lowerBound == null) {
+ it.seekToFirst();
+ } else {
+ it.seek(lowerBound);
+ }
+
+ return new RocksIteratorAdapter<>(it) {
+ @Override
+ protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
+ return ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ IgniteUtils.closeAll(options, upperBoundSlice);
+ }
+ };
+ }
+
+ private IndexRow decodeRow(ByteBuffer bytes) {
+ assert bytes.getShort(0) == partitionStorage.partitionId();
+
+ var tuple = new BinaryTuple(descriptor.binaryTupleSchema(),
binaryTupleSlice(bytes));
+
+ // RowId UUID is located at the last 16 bytes of the key
+ long mostSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES *
2);
+ long leastSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES);
+
+ var rowId = new RowId(partitionStorage.partitionId(),
mostSignificantBits, leastSignificantBits);
+
+ return new IndexRowImpl(tuple, rowId);
+ }
+
+ private byte[] rocksPrefix(BinaryTuplePrefix prefix) {
+ return rocksPrefix(prefix, 0).array();
+ }
+
+ private ByteBuffer rocksPrefix(InternalTuple prefix, int extraLength) {
+ ByteBuffer keyBytes = prefix.byteBuffer();
+
+ return ByteBuffer.allocate(Short.BYTES + keyBytes.remaining() +
extraLength)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putShort((short) partitionStorage.partitionId())
+ .put(keyBytes);
+ }
+
+ private byte[] rocksKey(IndexRow row) {
+ RowId rowId = row.rowId();
+
+ // We don't store the Partition ID as it is already a part of the key.
+ return rocksPrefix(row.indexColumns(), 2 * Long.BYTES)
+ .putLong(rowId.mostSignificantBits())
+ .putLong(rowId.leastSignificantBits())
+ .array();
+ }
+
+ private Predicate<ByteBuffer> startsWith(BinaryTuplePrefix prefix) {
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ return key -> {
+ // First, compare the partitionIDs.
+ boolean partitionIdCompare = key.getShort(0) ==
partitionStorage.partitionId();
+
+ if (!partitionIdCompare) {
+ return false;
+ }
+
+ // Finally, compare the remaining parts of the tuples.
+ // TODO: This part may be optimized by comparing binary tuple
representations directly. However, currently BinaryTuple prefixes
+ // are not binary compatible with regular tuples. See
https://issues.apache.org/jira/browse/IGNITE-17711.
+ return comparator.compare(prefix.byteBuffer(),
binaryTupleSlice(key)) == 0;
+ };
+ }
+
+ private static ByteBuffer binaryTupleSlice(ByteBuffer key) {
+ return key.duplicate()
+ // Discard partition ID.
+ .position(Short.BYTES)
+ // Discard row ID.
+ .limit(key.limit() - Long.BYTES * 2)
+ .slice()
Review Comment:
And a slice here? Seems off
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -488,8 +533,26 @@ private ColumnFamilyDescriptor cfDescriptorFromName(String
cfName) {
new
ColumnFamilyOptions().useFixedLengthPrefixExtractor(RocksDbHashIndexStorage.FIXED_PREFIX_LENGTH)
);
+ case SORTED_INDEX:
+ var indexDescriptor = new
SortedIndexDescriptor(sortedIndexId(cfName), tableCfg.value());
+
+ return sortedIndexCfDescriptor(indexDescriptor);
+
default:
throw new StorageException("Unidentified column family [name="
+ cfName + ", table=" + tableCfg.name() + ']');
}
}
+
+ /**
+ * Creates a Column Family descriptor for a Sorted Index.
+ */
+ private static ColumnFamilyDescriptor
sortedIndexCfDescriptor(SortedIndexDescriptor descriptor) {
+ String cfName = sortedIndexCfName(descriptor.id());
Review Comment:
You just wrapped cfName into index id and now you're unwrapping it
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * {@link AbstractComparator} implementation that compares Binary Tuples.
+ */
+public class RocksDbBinaryTupleComparator extends AbstractComparator {
+ private final BinaryTupleComparator comparator;
+
+ /** Options needed for resource management. */
+ private final ComparatorOptions options;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Sorted Index descriptor.
+ */
+ public RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this(descriptor, new ComparatorOptions());
+ }
+
+ private RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor,
ComparatorOptions options) {
+ super(options);
+
+ this.options = options;
+ this.comparator = new BinaryTupleComparator(descriptor);
+ }
+
+ @Override
+ public String name() {
+ return getClass().getCanonicalName();
+ }
+
+ @Override
+ public int compare(ByteBuffer a, ByteBuffer b) {
+ int comparePartitionId = Short.compare(a.getShort(), b.getShort());
+
+ if (comparePartitionId != 0) {
+ return comparePartitionId;
+ }
+
+ ByteBuffer firstBinaryTupleBuffer =
a.slice().order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer secondBinaryTupleBuffer =
b.slice().order(ByteOrder.LITTLE_ENDIAN);
+
+ int compareTuples = comparator.compare(firstBinaryTupleBuffer,
secondBinaryTupleBuffer);
+
+ // Binary Tuple Prefixes don't have row IDs, so they can't be compared.
+ if (compareTuples != 0 || isPrefix(firstBinaryTupleBuffer) ||
isPrefix(secondBinaryTupleBuffer)) {
+ return compareTuples;
+ }
+
+ return compareRowIds(a, b);
+ }
+
+ private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
+ long firstMostSignBits = a.getLong(a.remaining() - Long.BYTES * 2);
+ long secondMostSignBits = b.getLong(b.remaining() - Long.BYTES * 2);
+
+ int compare = Long.compare(firstMostSignBits, secondMostSignBits);
+
+ if (compare != 0) {
+ return compare;
+ }
+
+ long firstLeastSignBits = a.getLong(a.remaining() - Long.BYTES);
Review Comment:
also here
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static
org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.concat;
+import static
org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.dropWhile;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.map;
+import static
org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.takeWhile;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * {@link SortedIndexStorage} implementation based on RocksDB.
+ *
+ * <p>This storage uses the following format for keys:
+ * <pre>
+ * Partition ID - 2 bytes
+ * Tuple value - variable length
+ * Row ID (UUID) - 16 bytes
+ * </pre>
+ *
+ * <p>We use an empty array as values, because all required information can be
extracted from the key.
+ */
+public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+ private final SortedIndexDescriptor descriptor;
+
+ private final ColumnFamily indexCf;
+
+ private final RocksDbMvPartitionStorage partitionStorage;
+
+ /**
+ * Creates a storage.
+ *
+ * @param descriptor Sorted Index descriptor.
+ * @param indexCf Column family that stores the index data.
+ * @param partitionStorage Partition storage of the corresponding index.
+ */
+ public RocksDbSortedIndexStorage(
+ SortedIndexDescriptor descriptor,
+ ColumnFamily indexCf,
+ RocksDbMvPartitionStorage partitionStorage
+ ) {
+ this.descriptor = descriptor;
+ this.indexCf = indexCf;
+ this.partitionStorage = partitionStorage;
+ }
+
+ @Override
+ public SortedIndexDescriptor indexDescriptor() {
+ return descriptor;
+ }
+
+ @Override
+ public void put(IndexRow row) {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+ try {
+ writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to insert data into sorted
index. Index ID: " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public void remove(IndexRow row) {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+ try {
+ writeBatch.delete(indexCf.handle(), rocksKey(row));
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to remove data from sorted
index. Index ID: " + descriptor.id(), e);
+ }
+ }
+
+ @Override
+ public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ return scan(lowerBound, upperBound, includeLower, includeUpper);
+ }
+
+ private Cursor<IndexRow> scan(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ boolean includeLower,
+ boolean includeUpper
+ ) {
+ byte[] lowerBoundBytes = lowerBound == null ? null :
rocksPrefix(lowerBound);
+ byte[] upperBoundBytes = upperBound == null ? null :
rocksPrefix(upperBound);
+
+ Cursor<ByteBuffer> cursor = createScanCursor(lowerBoundBytes,
upperBoundBytes);
+
+ // Skip the lower bound, if needed (RocksDB includes the lower bound
by default).
+ if (!includeLower && lowerBound != null) {
+ cursor = dropWhile(cursor, startsWith(lowerBound));
+ }
+
+ // Include the upper bound, if needed (RocksDB excludes the upper
bound by default).
+ if (includeUpper && upperBound != null) {
+ Cursor<ByteBuffer> upperBoundCursor =
takeWhile(createScanCursor(upperBoundBytes, null), startsWith(upperBound));
+
+ cursor = concat(cursor, upperBoundCursor);
+ }
+
+ return map(cursor, this::decodeRow);
+ }
+
+ private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound,
byte @Nullable [] upperBound) {
+ Slice upperBoundSlice = upperBound == null ? null : new
Slice(upperBound);
+
+ ReadOptions options = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
+
+ RocksIterator it = indexCf.newIterator(options);
+
+ if (lowerBound == null) {
+ it.seekToFirst();
+ } else {
+ it.seek(lowerBound);
+ }
+
+ return new RocksIteratorAdapter<>(it) {
+ @Override
+ protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
+ return ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ IgniteUtils.closeAll(options, upperBoundSlice);
+ }
+ };
+ }
+
+ private IndexRow decodeRow(ByteBuffer bytes) {
+ assert bytes.getShort(0) == partitionStorage.partitionId();
+
+ var tuple = new BinaryTuple(descriptor.binaryTupleSchema(),
binaryTupleSlice(bytes));
+
+ // RowId UUID is located at the last 16 bytes of the key
+ long mostSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES *
2);
+ long leastSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES);
+
+ var rowId = new RowId(partitionStorage.partitionId(),
mostSignificantBits, leastSignificantBits);
+
+ return new IndexRowImpl(tuple, rowId);
+ }
+
+ private byte[] rocksPrefix(BinaryTuplePrefix prefix) {
+ return rocksPrefix(prefix, 0).array();
+ }
+
+ private ByteBuffer rocksPrefix(InternalTuple prefix, int extraLength) {
+ ByteBuffer keyBytes = prefix.byteBuffer();
+
+ return ByteBuffer.allocate(Short.BYTES + keyBytes.remaining() +
extraLength)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putShort((short) partitionStorage.partitionId())
+ .put(keyBytes);
+ }
+
+ private byte[] rocksKey(IndexRow row) {
+ RowId rowId = row.rowId();
+
+ // We don't store the Partition ID as it is already a part of the key.
+ return rocksPrefix(row.indexColumns(), 2 * Long.BYTES)
+ .putLong(rowId.mostSignificantBits())
+ .putLong(rowId.leastSignificantBits())
+ .array();
+ }
+
+ private Predicate<ByteBuffer> startsWith(BinaryTuplePrefix prefix) {
+ var comparator = new BinaryTupleComparator(descriptor);
+
+ return key -> {
+ // First, compare the partitionIDs.
+ boolean partitionIdCompare = key.getShort(0) ==
partitionStorage.partitionId();
+
+ if (!partitionIdCompare) {
+ return false;
+ }
+
+ // Finally, compare the remaining parts of the tuples.
+ // TODO: This part may be optimized by comparing binary tuple
representations directly. However, currently BinaryTuple prefixes
+ // are not binary compatible with regular tuples. See
https://issues.apache.org/jira/browse/IGNITE-17711.
+ return comparator.compare(prefix.byteBuffer(),
binaryTupleSlice(key)) == 0;
+ };
+ }
+
+ private static ByteBuffer binaryTupleSlice(ByteBuffer key) {
+ return key.duplicate()
Review Comment:
Why duplicate instead of just slice?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * {@link AbstractComparator} implementation that compares Binary Tuples.
+ */
+public class RocksDbBinaryTupleComparator extends AbstractComparator {
+ private final BinaryTupleComparator comparator;
+
+ /** Options needed for resource management. */
+ private final ComparatorOptions options;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Sorted Index descriptor.
+ */
+ public RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor) {
+ this(descriptor, new ComparatorOptions());
+ }
+
+ private RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor,
ComparatorOptions options) {
+ super(options);
+
+ this.options = options;
+ this.comparator = new BinaryTupleComparator(descriptor);
+ }
+
+ @Override
+ public String name() {
+ return getClass().getCanonicalName();
+ }
+
+ @Override
+ public int compare(ByteBuffer a, ByteBuffer b) {
+ int comparePartitionId = Short.compare(a.getShort(), b.getShort());
+
+ if (comparePartitionId != 0) {
+ return comparePartitionId;
+ }
+
+ ByteBuffer firstBinaryTupleBuffer =
a.slice().order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer secondBinaryTupleBuffer =
b.slice().order(ByteOrder.LITTLE_ENDIAN);
+
+ int compareTuples = comparator.compare(firstBinaryTupleBuffer,
secondBinaryTupleBuffer);
+
+ // Binary Tuple Prefixes don't have row IDs, so they can't be compared.
+ if (compareTuples != 0 || isPrefix(firstBinaryTupleBuffer) ||
isPrefix(secondBinaryTupleBuffer)) {
+ return compareTuples;
+ }
+
+ return compareRowIds(a, b);
+ }
+
+ private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
+ long firstMostSignBits = a.getLong(a.remaining() - Long.BYTES * 2);
+ long secondMostSignBits = b.getLong(b.remaining() - Long.BYTES * 2);
Review Comment:
same about limit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]