denis-chudov commented on code in PR #2856:
URL: https://github.com/apache/ignite-3/pull/2856#discussion_r1423693247
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -507,22 +514,59 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
* @param request Tx recovery request.
* @return The future is complete when the transaction state is finalized.
*/
- private CompletableFuture<Void> processTxRecoveryAction(TxRecoveryMessage
request) {
+ private CompletableFuture<Void> processTxRecoveryAction(TxRecoveryMessage
request, String senderId) {
UUID txId = request.txId();
TxMeta txMeta = txStateStorage.get(txId);
// Check whether a transaction has already been finished.
- boolean transactionAlreadyFinished = txMeta != null &&
isFinalState(txMeta.txState());
+ if (txMeta != null && isFinalState(txMeta.txState())) {
+ return recoverFinishedTx(txId, txMeta)
+ // If the sender has sent a recovery message, it failed to
handle it on its own,
+ // so sending cleanup to the sender for the transaction we
know is finished.
+ .thenCompose(v -> runCleanupOnNode(txId, senderId));
+ }
- if (transactionAlreadyFinished) {
+ LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+ return triggerTxRecovery(txId, senderId);
+ }
+
+ private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta
txMeta) {
+ if (txMeta.locksReleased() || txMeta.enlistedPartitions().isEmpty()) {
+ // Nothing to do if the locks have been released already or there
are no enlistedPartitions available.
return nullCompletedFuture();
}
- LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+ // Otherwise run a cleanup on the known set of partitions.
+ return (CompletableFuture<Void>) durableCleanup(txId, txMeta);
+ }
- // TODO: IGNITE-20735 Implement initiate recovery handling logic.
- return nullCompletedFuture();
+ /**
+ * Run cleanup on a node.
+ *
+ * @param txId Transaction id.
+ * @param nodeId Node id (inconsistent).
+ */
+ private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId)
{
+ // Get node id of the sender to send back cleanup requests.
+ String nodeConsistentId =
clusterNodeResolver.getConsistentIdById(nodeId);
+
+ return nodeConsistentId == null ? nullCompletedFuture() :
txManager.cleanup(nodeConsistentId, txId);
+ }
+
+ private CompletableFuture<Void> triggerTxRecovery(UUID txId, String
senderId) {
+ // If the transaction state is pending, then the transaction should be
rolled back,
+ // meaning that the state is changed to aborted and a corresponding
cleanup request
+ // is sent in a common durable manner to a partition that have
initiated recovery.
+ return txManager.finish(
+ new HybridTimestampTracker(),
+ replicationGroupId,
+ false,
+ Map.of(replicationGroupId, 0L), // term is not
required for the rollback.
+ txId
+ )
+ .thenCompose(unused -> runCleanupOnNode(txId, senderId));
Review Comment:
I think, there is a possibility of returning a failed future from
`txManager.finish()` due to a race with another recovery process for the same
transaction. Maybe there should be `whenComplete` here.
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java:
##########
@@ -207,7 +208,17 @@ private static class TestInternalTableImpl extends
InternalTableImpl {
1,
Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
PART_CNT,
- addr -> mock(ClusterNode.class),
+ new ClusterNodeResolver() {
+ @Override
+ public ClusterNode getById(String id) {
+ return mock(ClusterNode.class);
+ }
+
+ @Override
+ public ClusterNode getByConsistentId(String
consistentId) {
+ return mock(ClusterNode.class);
+ }
+ },
Review Comment:
I see too many of similar snippets, lets create a test implementation of
ClusterNodeResolver.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]