isapego commented on code in PR #2215:
URL: https://github.com/apache/ignite-3/pull/2215#discussion_r1236832053
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -648,18 +648,26 @@ public CompletableFuture<Void>
upsertAll(Collection<BinaryRowEx> rows, InternalT
return enlistInTx(
rows,
tx,
- (commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
- .groupId(groupId)
- .commitPartitionId(commitPart)
- .binaryRowsBytes(serializeBinaryRows(keyRows0))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_UPSERT_ALL)
- .timestampLong(clock.nowLong())
- .build(),
+ this::upsertAllInternal,
CompletableFuture::allOf);
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int
partition) {
+ InternalTransaction tx = txManager.begin();
+ TablePartitionId partGroupId = new TablePartitionId(tableId,
partition);
+
+ CompletableFuture<Void> fut = enlistWithRetry(
+ tx,
+ partition,
+ (commitPart, term) -> upsertAllInternal(commitPart, rows, tx,
partGroupId, term),
+ ATTEMPTS_TO_ENLIST_PARTITION
+ );
Review Comment:
What is going to happen, if we provide non-matching rows and partition?
AFAIR our streamer gives no guarantees for the right partition, only best
effort.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]