sanpwc commented on code in PR #1191:
URL: https://github.com/apache/ignite-3/pull/1191#discussion_r993793111
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -364,15 +295,79 @@ public void onShutdown() {
}
}
- /**
- * Extracts a key and a value from the {@link BinaryRow} and wraps it in a
{@link DataRow}.
- *
- * @param row Binary row.
- * @return Data row.
- */
- @NotNull
- private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
- return new DelegatingDataRow(new BinarySearchRow(row), row.bytes());
+ private void addToIndexes(BinaryRow tableRow, RowId rowId) {
+ List<IndexStorage> indexes = activeIndexes.get();
+
+ if (nullOrEmpty(indexes)) {
+ return;
+ }
+
+ for (IndexStorage index : indexes) {
+ IndexRow indexRow = toIndexRow(index, tableRow, rowId);
+
+ assert indexRow != null;
+
+ index.put(indexRow);
+ }
+ }
+
+ private @Nullable IndexRow toIndexRow(IndexStorage storage, BinaryRow
tableRow, RowId rowId) {
+ SchemaDescriptor descriptor =
schemaProvider.apply(tableRow.schemaVersion());
+
+ int[] indexedColumns = resolveIndexColumns(descriptor, storage);
+
+ if (indexedColumns == null) {
+ return null;
+ }
+
+ BinaryTupleSchema tupleSchema =
BinaryTupleSchema.createSchema(descriptor, indexedColumns);
+
+ var converter = new BinaryConverter(descriptor, tupleSchema);
+
+ ByteBuffer buffer = converter.toTuple(tableRow);
+
+ return new IndexRowImpl(new BinaryTuple(tupleSchema, buffer), rowId);
+ }
+
+ private int @Nullable [] resolveIndexColumns(SchemaDescriptor descriptor,
IndexStorage indexStorage) {
Review Comment:
Duplication of one in PartitionReplicaListener. POC issue?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -814,25 +844,106 @@ private CompletableFuture<RowId>
takeLocksForUpsert(ByteBuffer searchKey, UUID i
/**
* Takes all required locks on a key, before inserting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
+ * @param tableRow Table row.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is
no value.
*/
- private CompletableFuture<RowId> takeLocksForInsert(ByteBuffer searchKey,
UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.S) // Index S lock
- .thenCompose(sharedIdxLock -> {
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- if (rowId == null) {
- return lockManager.acquire(txId, new LockKey(indexId,
searchKey), LockMode.X) // Index X lock
- .thenCompose(exclusiveIdxLock ->
- lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX) // IX lock on table
- .thenApply(tblLock -> null));
- }
+ private CompletableFuture<?> takeLocksForInsert(BinaryRow tableRow, UUID
txId) {
+ return takeLockOnIndexes(tableRow, txId)
Review Comment:
Seems that it's mandatory to take locks on keys for all indexes **only** if
there is at least one sorted index, because otherwise we'll lock whole table.
Will clarify.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -307,32 +264,6 @@ private void handleTxCleanupCommand(TxCleanupCommand cmd,
long commandIndex) {
storage.runConsistently(() -> {
UUID txId = cmd.txId();
- Set<ByteBuffer> removedKeys = txsRemovedKeys.getOrDefault(txId,
Collections.emptySet());
-
- Set<ByteBuffer> insertedKeys = txsInsertedKeys.getOrDefault(txId,
Collections.emptySet());
-
- Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId,
Collections.emptySet());
-
- if (cmd.commit()) {
Review Comment:
As was discussed storage.commitWrite() and storage.abortWrite() should be
restored.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -814,25 +844,106 @@ private CompletableFuture<RowId>
takeLocksForUpsert(ByteBuffer searchKey, UUID i
/**
* Takes all required locks on a key, before inserting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
+ * @param tableRow Table row.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is
no value.
*/
- private CompletableFuture<RowId> takeLocksForInsert(ByteBuffer searchKey,
UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.S) // Index S lock
- .thenCompose(sharedIdxLock -> {
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- if (rowId == null) {
- return lockManager.acquire(txId, new LockKey(indexId,
searchKey), LockMode.X) // Index X lock
- .thenCompose(exclusiveIdxLock ->
- lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX) // IX lock on table
- .thenApply(tblLock -> null));
- }
+ private CompletableFuture<?> takeLocksForInsert(BinaryRow tableRow, UUID
txId) {
+ return takeLockOnIndexes(tableRow, txId)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX));
+ }
- return CompletableFuture.completedFuture(rowId);
- });
+ private CompletableFuture<?> takeLockOnIndexes(BinaryRow tableRow, UUID
txId) {
+ List<IndexStorage> indexes = activeIndexes.get();
+
+ if (nullOrEmpty(indexes)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
+ int idx = 0;
+
+ for (IndexStorage index : indexes) {
+ ByteBuffer indexRow = toIndexKey(index, tableRow);
+
+ UUID indexId = indexId(index);
+
+ assert indexRow != null;
+
+ locks[idx++] = lockManager.acquire(txId, new LockKey(indexId,
indexRow), LockMode.X);
+ }
+
+ return CompletableFuture.allOf(locks);
+ }
+
+ private UUID indexId(IndexStorage index) {
+ if (index instanceof HashIndexStorage) {
Review Comment:
I would expect indexDescriptor to be a part of IndexStorage with some common
descriptor part such as id.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1946,4 +1955,28 @@ BiFunction<List<Peer>, Long, CompletableFuture<Void>>
movePartition(Supplier<Raf
private <T extends ConfigurationProperty<?>> T directProxy(T property) {
return getMetadataLocallyOnly ? property :
ConfigurationUtil.directProxy(property);
}
+
+ private Supplier<List<IndexStorage>> activeIndexes(UUID tableId, int
partId, MvTableStorage tableStorage) {
+ return () -> {
+ NamedListView<TableIndexView> listView =
tableStorage.tablesConfiguration().indexes().value();
+
+ if (listView.size() == 0) {
+ return List.of();
+ }
+
+ List<IndexStorage> indexes = new ArrayList<>(listView.size());
+
+ for (int i = 0; i < listView.size(); i++) {
+ if (!listView.get(i).tableId().equals(tableId)) {
Review Comment:
I don't like an idea of iterating through the whole set of tables in order
to filter them with same tableId predicate.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -814,25 +844,106 @@ private CompletableFuture<RowId>
takeLocksForUpsert(ByteBuffer searchKey, UUID i
/**
* Takes all required locks on a key, before inserting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
+ * @param tableRow Table row.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is
no value.
*/
- private CompletableFuture<RowId> takeLocksForInsert(ByteBuffer searchKey,
UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.S) // Index S lock
- .thenCompose(sharedIdxLock -> {
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- if (rowId == null) {
- return lockManager.acquire(txId, new LockKey(indexId,
searchKey), LockMode.X) // Index X lock
- .thenCompose(exclusiveIdxLock ->
- lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX) // IX lock on table
- .thenApply(tblLock -> null));
- }
+ private CompletableFuture<?> takeLocksForInsert(BinaryRow tableRow, UUID
txId) {
+ return takeLockOnIndexes(tableRow, txId)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX));
+ }
- return CompletableFuture.completedFuture(rowId);
- });
+ private CompletableFuture<?> takeLockOnIndexes(BinaryRow tableRow, UUID
txId) {
+ List<IndexStorage> indexes = activeIndexes.get();
+
+ if (nullOrEmpty(indexes)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
+ int idx = 0;
+
+ for (IndexStorage index : indexes) {
+ ByteBuffer indexRow = toIndexKey(index, tableRow);
+
+ UUID indexId = indexId(index);
+
+ assert indexRow != null;
+
+ locks[idx++] = lockManager.acquire(txId, new LockKey(indexId,
indexRow), LockMode.X);
Review Comment:
Not that simple.Different type of indexes will require different type of
locks on different operations on different entries
_different operations_
**Insertion** in sorted non-unique index expect following locks to be taken:
- IX_short(nextKey)
- X_commit(currentKey) if nextKey previously locked in S, X or SIX mode
- IX_commit(currentKey) otherwise
**Select**
S_commit(currentKey)
_different entries_
More precisely select in non-unique is not always about locking current key,
but sometimes is about locking +INF
- If the currentKey is bigger than the upper bound, return NOT_FOUND but
take a lock anyway.
- If currentKey matches the upper bound no need to take a lock on the next
key.
- If the upper bound is null, take a lock on +INF.
- If nothing is found, take a lock on +INF.
_Different type of indexes_
That part is pretty straight forward, hash index on insert requires
different set of locks to be taken in comparison to sorted index, same is about
unique/non-unique indexes.
In order to properly hide given difficulties, I'd favor using Alexey's
proposal with wrapping IndexStorages with decorators that'll add index specific
lock stuff to put, get and remove methods. That's why, I would expect
`Supplier<List<IndexStorage>> activeIndexes` to return list of such
decorator instead of IndexStorage's.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]