korlov42 commented on code in PR #1191:
URL: https://github.com/apache/ignite-3/pull/1191#discussion_r1004206463
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -856,335 +886,218 @@ private CompletableFuture<Object>
processSingleEntryAction(ReadWriteSingleRowRep
ByteBuffer searchKey = searchRow.keySlice();
- UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
-
UUID txId = request.transactionId();
switch (request.requestType()) {
case RW_GET: {
- CompletableFuture<RowId> lockFut = takeLocksForGet(searchKey,
indexId, txId);
-
- return lockFut.thenApply(lockedRowId -> {
- BinaryRow result = lockedRowId != null
- ?
resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE),
txId) : null;
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return result;
+ return takeLocksForGet(rowId, txId)
+ .thenApply(ignored -> row);
});
}
case RW_DELETE: {
- CompletableFuture<RowId> lockFut =
takeLocksForDelete(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean removed = lockedRowId != null;
-
- CompletableFuture raftFut = removed ?
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, txId)) :
- CompletableFuture.completedFuture(null);
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> removed);
+ return takeLocksForDelete(searchRow, rowId, txId)
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId, txId)))
+ .thenApply(ignored -> true);
});
}
case RW_GET_AND_DELETE: {
- CompletableFuture<RowId> lockFut =
takeLocksForDelete(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- BinaryRow lockedRow = lockedRowId != null
- ?
resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE),
txId) : null;
-
- CompletableFuture raftFut = lockedRowId != null ?
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, txId)) :
- CompletableFuture.completedFuture(null);
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return raftFut.thenApply(ignored -> lockedRow);
+ return takeLocksForDelete(searchRow, rowId, txId)
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId, txId)))
+ .thenApply(ignored -> row);
});
}
case RW_DELETE_EXACT: {
- CompletableFuture<RowId> lockFut =
takeLocksForDeleteExact(searchKey, searchRow, indexId, txId);
-
- return lockFut.thenCompose(lockedRow -> {
- boolean removed = lockedRow != null;
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- CompletableFuture raftFut = removed ?
applyCmdWithExceptionHandling(new UpdateCommand(lockedRow, txId)) :
- CompletableFuture.completedFuture(null);
+ return takeLocksForDeleteExact(searchRow, rowId, row, txId)
+ .thenCompose(validatedRowId -> {
+ if (validatedRowId == null) {
+ return
CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> removed);
+ return applyCmdWithExceptionHandling(new
UpdateCommand(validatedRowId, txId))
+ .thenApply(ignored -> true);
+ });
});
}
case RW_INSERT: {
- CompletableFuture<RowId> lockFut =
takeLocksForInsert(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean inserted = lockedRowId == null;
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId != null) {
+ return CompletableFuture.completedFuture(false);
+ }
- CompletableFuture raftFut =
- lockedRowId == null ?
applyCmdWithExceptionHandling(new UpdateCommand(new RowId(partId), searchRow,
txId)) :
- CompletableFuture.completedFuture(null);
+ RowId rowId0 = new RowId(partId);
- return raftFut.thenApply(ignored -> inserted);
+ return takeLocksForInsert(searchRow, rowId0, txId)
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId0, searchRow, txId)))
+ .thenApply(ignored -> true);
});
}
case RW_UPSERT: {
- CompletableFuture<RowId> lockFut =
takeLocksForUpsert(searchKey, indexId, txId);
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ boolean insert = rowId == null;
+
+ RowId rowId0 = insert ? new RowId(partId) : rowId;
- return lockFut.thenCompose(lockedRowId -> {
- CompletableFuture raftFut =
- lockedRowId != null ?
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
- applyCmdWithExceptionHandling(new
UpdateCommand(new RowId(partId), searchRow, txId));
+ CompletableFuture<?> lockFut = insert
+ ? takeLocksForInsert(searchRow, rowId0, txId)
+ : takeLocksForUpdate(searchRow, rowId0, txId);
- return raftFut.thenApply(ignored -> null);
+ return lockFut
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId0, searchRow, txId)))
+ .thenApply(ignored -> null);
});
}
case RW_GET_AND_UPSERT: {
- return lockManager.acquire(txId, new LockKey(indexId,
searchKey), LockMode.X)
- .thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on
table
- CompletableFuture<Lock> rowLockFut =
(rowId != null)
- ? lockManager.acquire(txId,
new LockKey(tableId, rowId), LockMode.X)
- // X lock on RowId
- :
CompletableFuture.completedFuture(null);
-
- return rowLockFut.thenCompose(rowLock
-> {
- BinaryRow result = rowId != null
- ?
resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId) :
null;
-
- CompletableFuture raftFut =
- rowId != null ?
applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId))
- :
applyCmdWithExceptionHandling(
- new
UpdateCommand(new RowId(partId), searchRow, txId));
-
- return raftFut.thenApply(ignored
-> result);
- });
- });
- });
- }
- case RW_GET_AND_REPLACE: {
- CompletableFuture<RowId> idxLockFut =
lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S)
- .thenCompose(sharedIdxLock -> { // Index S lock
- RowId rowId = rowIdByKey(indexId, searchKey);
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ boolean insert = rowId == null;
- if (rowId != null) {
- return lockManager.acquire(txId, new
LockKey(indexId, searchKey), LockMode.X)
- .thenApply(exclusiveIdxLock -> rowId);
// Index X lock
- }
-
- return CompletableFuture.completedFuture(null);
- });
-
- return idxLockFut.thenCompose(lockedRowId -> {
- return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- CompletableFuture<BinaryRow> rowLockFut;
+ RowId rowId0 = insert ? new RowId(partId) : rowId;
- if (lockedRowId != null) {
- rowLockFut = lockManager.acquire(txId, new
LockKey(tableId, lockedRowId), LockMode.X)
- .thenApply(rowLock -> // X lock on
RowId
-
resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE),
txId)
- );
- } else {
- rowLockFut =
CompletableFuture.completedFuture(null);
- }
+ CompletableFuture<?> lockFut = insert
+ ? takeLocksForInsert(searchRow, rowId0, txId)
+ : takeLocksForUpdate(searchRow, rowId0, txId);
- return rowLockFut.thenCompose(lockedRow -> {
- CompletableFuture raftFut = lockedRow ==
null ? CompletableFuture.completedFuture(null) :
- applyCmdWithExceptionHandling(new
UpdateCommand(lockedRowId, searchRow, txId));
+ return lockFut
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId0, searchRow, txId)))
+ .thenApply(ignored -> row);
+ });
+ }
+ case RW_GET_AND_REPLACE: {
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return raftFut.thenApply(ignored ->
lockedRow);
- });
- });
+ return takeLocksForUpdate(searchRow, rowId, txId)
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId)))
+ .thenApply(ignored0 -> row);
});
}
case RW_REPLACE_IF_EXIST: {
- CompletableFuture<RowId> lockFut =
takeLocksForReplaceIfExist(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean replaced = lockedRowId != null;
-
- CompletableFuture raftFut = replaced ?
applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
- CompletableFuture.completedFuture(null);
+ return resolveRowByPk(searchKey, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> replaced);
+ return takeLocksForUpdate(searchRow, rowId, txId)
+ .thenCompose(ignored ->
applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId)))
+ .thenApply(ignored -> true);
});
}
default: {
throw new
IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Unknown single request
[actionType={}]", request.requestType()));
+ format("Unknown single request [actionType={}]",
request.requestType()));
}
}
}
- /**
- * Takes all required locks on a key, before replacing.
- *
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
- * @return Future completes with {@link RowId} or {@code null} if there is
no entry.
- */
- private CompletableFuture<RowId> takeLocksForReplaceIfExist(ByteBuffer
searchKey, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.S).thenCompose(shareIdxLock -> { // Index R lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- CompletableFuture<Lock> idxLockFut = rowId != null
- ? lockManager.acquire(txId, new LockKey(indexId,
searchKey), LockMode.X) // Index X lock
- : CompletableFuture.completedFuture(null);
-
- return idxLockFut.thenCompose(exclusiveIdxLock ->
lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- if (rowId != null) {
- RowId rowIdToLock = rowId;
-
- return lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.X)
- .thenApply(rowLock -> rowIdToLock); // X
lock on RowId
- }
-
- return CompletableFuture.completedFuture(null);
- }));
- });
- }
-
/**
* Takes all required locks on a key, before upserting.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is
no value.
*/
- private CompletableFuture<RowId> takeLocksForUpsert(ByteBuffer searchKey,
UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.X).thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- if (rowId != null) {
- return lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.X)
- .thenApply(rowLock -> rowId); // X lock on
RowId
- }
-
- return CompletableFuture.completedFuture(null);
- });
- });
+ private CompletableFuture<RowId> takeLocksForUpdate(BinaryRow tableRow,
RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId,
tableRow.keySlice()), LockMode.X) // Index X lock
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX))
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.X))
+ .thenCompose(ignored -> takePutLockOnIndexes(tableRow, rowId,
txId))
+ .thenApply(ignored -> rowId);
}
/**
* Takes all required locks on a key, before inserting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
+ * @param tableRow Table row.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is
no value.
*/
- private CompletableFuture<RowId> takeLocksForInsert(ByteBuffer searchKey,
UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.S) // Index S lock
- .thenCompose(sharedIdxLock -> {
- RowId rowId = rowIdByKey(indexId, searchKey);
+ private CompletableFuture<RowId> takeLocksForInsert(BinaryRow tableRow,
RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId,
tableRow.keySlice()), LockMode.X)
+ .thenCompose(exclusiveIdxLock -> lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX)) // IX lock on table
+ .thenCompose(ignored -> takePutLockOnIndexes(tableRow, rowId,
txId))
+ .thenApply(tblLock -> rowId);
+ }
- if (rowId == null) {
- return lockManager.acquire(txId, new LockKey(indexId,
searchKey), LockMode.X) // Index X lock
- .thenCompose(exclusiveIdxLock ->
- lockManager.acquire(txId, new
LockKey(tableId), LockMode.IX) // IX lock on table
- .thenApply(tblLock -> null));
- }
+ private CompletableFuture<?> takePutLockOnIndexes(BinaryRow tableRow,
RowId rowId, UUID txId) {
+ List<IndexLocker> indexes = indexesLockers.get();
- return CompletableFuture.completedFuture(rowId);
- });
+ if (nullOrEmpty(indexes)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
+ int idx = 0;
+
+ for (IndexLocker locker : indexes) {
+ locks[idx++] = locker.locksForInsert(txId, tableRow, rowId);
+ }
+
+ return CompletableFuture.allOf(locks);
}
/**
* Takes all required locks on a key, before deleting the value.
*
- * @param searchKey Key to search.
- * @param searchRow Row to remove.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is
no value for remove.
*/
- private CompletableFuture<RowId> takeLocksForDeleteExact(ByteBuffer
searchKey, BinaryRow searchRow, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey),
LockMode.X).thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IX) // IX lock on table
- .thenCompose(tblLock -> {
- CompletableFuture<RowId> rowLockFut;
-
- if (rowId != null) {
- rowLockFut = lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.S) // S lock on RowId
- .thenCompose(sharedRowLock -> {
- BinaryRow curVal =
resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId);
-
- if (equalValues(curVal, searchRow)) {
- return lockManager.acquire(txId,
new LockKey(tableId, rowId),
- LockMode.X) // X
lock on RowId
-
.thenApply(exclusiveRowLock -> rowId);
- }
-
- return
CompletableFuture.completedFuture(null);
- });
- } else {
- rowLockFut =
CompletableFuture.completedFuture(null);
- }
+ private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow
expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId,
expectedRow.keySlice()), LockMode.X) // Index X lock
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]