sanpwc commented on code in PR #2329:
URL: https://github.com/apache/ignite-3/pull/2329#discussion_r1275857943
##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java:
##########
@@ -268,7 +268,7 @@ public void onComplete() {
}
});
- gotExceptionLatch.await();
Review Comment:
There's one more `gotExceptionLatch.await();` on line 355 Could you please
also use proposed `assertTrue(gotExceptionLatch.await(10_000,
TimeUnit.MILLISECONDS));` there?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -233,10 +233,29 @@ private void handleUpdateCommand(UpdateCommand cmd, long
commandIndex, long comm
storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(),
rowId -> {
- txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new
TreeSet<>()).add(rowId);
+ if (!cmd.full()) {
+ txsPendingRowIds.computeIfAbsent(cmd.txId(), entry ->
new TreeSet<>()).add(rowId);
+ } else {
+ TxMeta txMetaToSet = new TxMeta(
+ COMMITED,
Review Comment:
Why do we need to update txState for one-phase transaction?
If it's temporary solution in order to provide enough data for write intent
resolution - please provide corresponding todo.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -326,19 +327,31 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processSingleEntryAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processSingleEntryAction(req));
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processMultiEntryAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processMultiEntryAction(req));
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processTwoEntriesAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processTwoEntriesAction(req));
} else if (request instanceof
ReadWriteScanRetrieveBatchReplicaRequest) {
var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
- return appendTxCommand(req.transactionId(), RequestType.RW_SCAN,
() -> processScanRetrieveBatchAction(req));
+ // Implicit RW scan can be committed locally on a last batch or
error.
+ return appendTxCommand(req.transactionId(), RequestType.RW_SCAN,
false, () -> processScanRetrieveBatchAction(req)).handle(
Review Comment:
That doesn't have much sense. All single-partition read transactions should
be RO.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -326,19 +327,31 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processSingleEntryAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processSingleEntryAction(req));
Review Comment:
With some slight updates in writeIntentResolution currently or if we will
put committed value instead of writeIntent it'll be senseless to acquire lock
in case of single key operations.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java:
##########
@@ -28,4 +28,9 @@ public interface PartitionCommand extends
SafeTimePropagatingCommand {
* Returns a transaction id.
*/
UUID txId();
+
+ /**
+ * Returns {@code true} if a command represents a full transaction.
+ */
+ boolean full();
Review Comment:
From my point of view. "full" is a sort of confusing naming.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -175,7 +184,12 @@ public void handleUpdateAll(
BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (commitTs != null) { // TODO do in one step.
+ storage.commitWrite(rowId, commitTs);
Review Comment:
Important one: storage.runConsistently() doesn't provide multiple writes
atomicity. In other words if one-phase tx1 writes k1 and k2, tx2 may see only
k1.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -124,7 +126,12 @@ public void handleUpdate(
BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
+ if (commitTs != null) { // TODO do in one step.
Review Comment:
Do you mean that instead of adding write with followed up by committing
write we should have sort of addCommittedWrite? In any case, according to our
rules, it's not valid to have todo without jira.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -233,10 +233,29 @@ private void handleUpdateCommand(UpdateCommand cmd, long
commandIndex, long comm
storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(),
rowId -> {
- txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new
TreeSet<>()).add(rowId);
+ if (!cmd.full()) {
+ txsPendingRowIds.computeIfAbsent(cmd.txId(), entry ->
new TreeSet<>()).add(rowId);
+ } else {
+ TxMeta txMetaToSet = new TxMeta(
+ COMMITED,
+
List.of(cmd.tablePartitionId().asTablePartitionId()),
+ cmd.safeTime()
+ );
+
+ boolean txStateChangeRes =
txStateStorage.compareAndSet(
+ cmd.txId(),
+ null,
+ txMetaToSet,
+ commandIndex,
+ commandTerm
+ );
+
+ assert txStateChangeRes : "Expecting successful commit
for full txn";
Review Comment:
I believe we may add txId here.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1323,16 +1336,37 @@ private <T> CompletableFuture<T> continueResolvingByPk(
}
+ /**
+ * Clean up txn locally.
+ *
+ * @param txId Tx ID.
+ * @param committed {@code True} if committed.
+ */
+ private void cleanupLocal(UUID txId, boolean committed) {
+ txCleanupReadyFutures.compute(txId, (id, txOps) -> {
Review Comment:
As was mentioned earlier I don't think that we need txnState for one-phase
commit transactions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]