This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch ignite-20177 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit ad309e1138333e95ea21501b60c0239551ec1a0f Author: Anton Vinogradov <a...@apache.org> AuthorDate: Mon Aug 7 23:07:31 2023 +0300 WIP --- .../dht/colocated/GridDhtColocatedLockFuture.java | 129 +++++++++------------ 1 file changed, 58 insertions(+), 71 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 3fdc88721fb..e5bb5d1a5d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -73,14 +73,11 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; @@ -721,21 +718,19 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - if (isMini(f)) { - MiniFuture m = (MiniFuture)f; - - synchronized (m) { - return "[node=" + m.node().id() + - ", rcvRes=" + m.rcvRes + - ", loc=" + m.node().isLocal() + - ", done=" + f.isDone() + "]"; - } + Collection<String> futs = F.viewReadOnly(futures(), (IgniteInternalFuture<?> f) -> { + if (isMini(f)) { + MiniFuture m = (MiniFuture)f; + + synchronized (m) { + return "[node=" + m.node().id() + + ", rcvRes=" + m.rcvRes + + ", loc=" + m.node().isLocal() + + ", done=" + f.isDone() + "]"; } - else - return "[loc=true, done=" + f.isDone() + "]"; } + else + return "[loc=true, done=" + f.isDone() + "]"; }); return S.toString(GridDhtColocatedLockFuture.class, this, @@ -1283,49 +1278,47 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF // Add new future. add(new GridEmbeddedFuture<>( - new C2<Exception, Exception, Boolean>() { - @Override public Boolean apply(Exception resEx, Exception e) { - if (CU.isLockTimeoutOrCancelled(e) || (CU.isLockTimeoutOrCancelled(resEx))) - return false; + (Exception resEx, Exception e) -> { + if (CU.isLockTimeoutOrCancelled(e) || (CU.isLockTimeoutOrCancelled(resEx))) + return false; - if (e != null) { - onError(e); + if (e != null) { + onError(e); - return false; - } - - if (resEx != null) { - onError(resEx); + return false; + } - return false; - } + if (resEx != null) { + onError(resEx); - if (log.isDebugEnabled()) - log.debug("Acquired lock for local DHT mapping [locId=" + cctx.nodeId() + - ", mappedKeys=" + keys + ", fut=" + GridDhtColocatedLockFuture.this + ']'); + return false; + } - if (inTx()) { - for (KeyCacheObject key : keys) - tx.entry(cctx.txKey(key)).markLocked(); - } - else { - for (KeyCacheObject key : keys) - cctx.mvcc().markExplicitOwner(cctx.txKey(key), threadId); - } + if (log.isDebugEnabled()) + log.debug("Acquired lock for local DHT mapping [locId=" + cctx.nodeId() + + ", mappedKeys=" + keys + ", fut=" + GridDhtColocatedLockFuture.this + ']'); - try { - // Proceed and add new future (if any) before completing embedded future. - if (mappings != null) - proceedMapping(); - } - catch (IgniteCheckedException ex) { - onError(ex); + if (inTx()) { + for (KeyCacheObject key : keys) + tx.entry(cctx.txKey(key)).markLocked(); + } + else { + for (KeyCacheObject key : keys) + cctx.mvcc().markExplicitOwner(cctx.txKey(key), threadId); + } - return false; - } + try { + // Proceed and add new future (if any) before completing embedded future. + if (mappings != null) + proceedMapping(); + } + catch (IgniteCheckedException ex) { + onError(ex); - return true; + return false; } + + return true; }, fut)); } @@ -1526,25 +1519,23 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys); - fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() { - @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) { - try { - TxDeadlock deadlock = fut.get(); + fut.listen((IgniteInternalFuture<TxDeadlock> fut) -> { + try { + TxDeadlock deadlock = fut.get(); - err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " + - "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']', - deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) : - null); - } - catch (IgniteCheckedException e) { - err = e; + err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " + + "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']', + deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) : + null); + } + catch (IgniteCheckedException e) { + err = e; - U.warn(log, "Failed to detect deadlock.", e); - } + U.warn(log, "Failed to detect deadlock.", e); + } - synchronized (LockTimeoutObject.this) { - onComplete(false, true); - } + synchronized (this) { + onComplete(false, true); } }); } @@ -1795,11 +1786,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys) cctx.mvcc().removeExplicitLock(threadId, cctx.txKey(key), lockVer); - mapOnTopology(true, new Runnable() { - @Override public void run() { - onDone(true); - } - }); + mapOnTopology(true, () -> onDone(true)); } /** {@inheritDoc} */