korlov42 commented on code in PR #1191:
URL: https://github.com/apache/ignite-3/pull/1191#discussion_r1003290700
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -599,42 +557,80 @@ private CompletableFuture
processTxCleanupAction(TxCleanupReplicaRequest request
}
/**
- * Returns index id of default {@lonk INDEX_SCAN_ID} index that will be
used for operation.
+ * Finds the row and its identifier by given pk search row.
*
- * @param indexId Index id or {@code null}.
- * @return Index id.
+ * @param key A bytes representing a primary key.
+ * @param ts A timestamp regarding which we need to resolve the given row.
+ * @param action An action to perform on a resolved row.
+ * @param <T> A type of the value returned by action.
+ * @return Result of the given action.
*/
- private @NotNull UUID indexIdOrDefault(@Nullable UUID indexId) {
- return indexId != null ? indexId : indexScanId;
- }
+ private <T> T resolveRowByPk(
+ ByteBuffer key,
+ HybridTimestamp ts,
+ BiFunction<@Nullable RowId, @Nullable BinaryRow, T> action
+ ) {
+ try (Cursor<RowId> cursor = pkConstraintStorage.get(key)) {
+ for (RowId rowId : cursor) {
+ ReadResult readResult = mvDataStorage.read(rowId, ts);
- /**
- * Find out a row id by an index.
- * TODO: IGNITE-17479 Integrate indexes into replicaListener command
handlers
- *
- * @param indexId Index id.
- * @param key Key to find.
- * @return Value or {@code null} if the key does not determine a value.
- */
- private RowId rowIdByKey(@NotNull UUID indexId, ByteBuffer key) {
- if (indexPkId.equals(indexId)) {
- return primaryIndex.get(key);
- }
+ BinaryRow row = resolveReadResult(readResult, ts, () -> {
+ if (readResult.newestCommitTimestamp() == null) {
+ return null;
+ }
- if (indexScanId.equals(indexId)) {
- RowId[] rowIdHolder = new RowId[1];
+ ReadResult committedReadResult = mvDataStorage.read(rowId,
readResult.newestCommitTimestamp());
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ",
timestamp="
+ + readResult.newestCommitTimestamp() + ']';
+
+ return committedReadResult.binaryRow();
+ });
- mvDataStorage.forEach((rowId, binaryRow) -> {
- if (rowIdHolder[0] == null &&
binaryRow.keySlice().equals(key)) {
- rowIdHolder[0] = rowId;
+ if (row != null && row.hasValue()) {
+ return action.apply(rowId, row);
}
- });
+ }
- return rowIdHolder[0];
+ return action.apply(null, null);
+ } catch (Exception e) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unable to close cursor [tableId={}]", tableId), e);
}
+ }
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("The index does not exist
[indexId={}]", indexId));
+ /**
+ * Finds the row and its identifier by given pk search row.
+ *
+ * @param key A bytes representing a primary key.
+ * @param txId An identifier of the transaction regarding which we need to
resolve the given row.
+ * @param action An action to perform on a resolved row.
+ * @param <T> A type of the value returned by action.
+ * @return A future object representing the result of the given action.
+ */
+ private <T> CompletableFuture<T> resolveRowByPk(
Review Comment:
Well, I assume (mistakenly) this should be obvious from functions signature:
resolve by key and timestamp => RO => just read the resolve the row; resolve by
key and txId => RW => acquire all necessary lock before resolving
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]