http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index f7f61c6,163ed99..c9c4f34 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@@ -68,26 -68,26 +68,35 @@@ public class IgniteTxEntry implements G private static final long serialVersionUID = 0L; /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */ - public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0); + public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0); /** Dummy version for any existing entry read in SERIALIZABLE transaction. */ - public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1); + public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 1); /** */ - public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2); + public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 2); /** */ - public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3); + public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3); + + /** Skip store flag bit mask. */ + private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01; + + /** Keep binary flag. */ + private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02; + + /** Flag indicating that old value for 'invoke' operation was non null on primary node. */ + private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04; + /** Skip store flag bit mask. */ + private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01; + + /** Keep binary flag. */ + private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02; + + /** Flag indicating that old value for 'invoke' operation was non null on primary node. */ + private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04; + /** Prepared flag updater. */ private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD = AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index f74d8a4,be718cf..3f07151 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@@ -32,10 -29,17 +32,11 @@@ import org.apache.ignite.cache.CacheEnt import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - /** * Cache atomic long implementation. */ @@@ -363,81 -422,38 +364,92 @@@ public final class GridCacheAtomicLongI } } + /** {@inheritDoc} */ ++ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { ++ this.atomicView = kctx.cache().atomicsCache(); ++ this.ctx = atomicView.context(); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { ++ // No-op. ++ } ++ ++ /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + /** - * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode. + * Reconstructs object on unmarshalling. * - * @param l Value will be added to atomic long. - * @return Callable for execution in async and sync mode. + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. */ - private Callable<Long> internalAddAndGet(final long l) { - return retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + return t.get1().dataStructures().atomicLong(t.get2(), 0L, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } - long retVal = val.get() + l; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheAtomicLongImpl.class, this); + } - val.set(retVal); + /** + * + */ + static class GetAndSetProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long newVal; + + /** + * @param newVal New value. + */ + GetAndSetProcessor(long newVal) { + this.newVal = newVal; + } - atomicView.put(key, val); + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); - tx.commit(); + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to add and get: " + this, e); + long curVal = val.get(); - throw e; - } - } - }); + e.setValue(new GridCacheAtomicLongValue(newVal)); + + return curVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GetAndSetProcessor.class, this); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index a026bf3,4365468..667fd15 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@@ -33,11 -29,18 +33,12 @@@ import org.apache.ignite.cache.CacheEnt import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - /** * Cache atomic reference implementation. */ @@@ -220,6 -205,93 +221,17 @@@ public final class GridCacheAtomicRefer } } - /** - * Method returns callable for execution {@link #set(Object)} operation in async and sync mode. - * - * @param val Value will be set in reference . - * @return Callable for execution in async and sync mode. - */ - private Callable<Boolean> internalSet(final T val) { - return retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); - - if (ref == null) - throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); - - ref.set(val); - - atomicView.put(key, ref); - - tx.commit(); - - return true; - } - catch (Error | Exception e) { - U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e); - - throw e; - } - } - }); - } - - /** - * Conditionally sets the new value. It will be set if {@code expValPred} is - * evaluate to {@code true}. - * - * @param expVal Expected value. - * @param newVal New value. - * @return Callable for execution in async and sync mode. - */ - private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) { - return retryTopologySafe(new Callable<T>() { - @Override public T call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); - - if (ref == null) - throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); - - T origVal = ref.get(); - - if (!F.eq(expVal, origVal)) { - tx.setRollbackOnly(); - - return origVal; - } - else { - ref.set(newVal); - - atomicView.getAndPut(key, ref); - - tx.commit(); - - return expVal; - } - } - catch (Error | Exception e) { - U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" + - newVal + ", atomicReference" + this + ']', e); - - throw e; - } - } - }); - } - + /** {@inheritDoc} */ + @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { + this.atomicView = kctx.cache().atomicsCache(); + this.ctx = atomicView.context(); + } + + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { - ++ // No-op. + } + /** * Check removed status. * http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 1e4da30,09cea43..877b158 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@@ -33,10 -29,19 +33,11 @@@ import org.apache.ignite.cache.CacheEnt import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; -import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgnitePredicate; - -import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Cache atomic stamped implementation. http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index 6ebd655,0039fa2..903423d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@@ -346,13 -361,16 +358,15 @@@ public final class GridCacheSemaphoreIm /** * This method is used for releasing the permits acquired by failing node. + * In case the semaphore is broken, no permits are released and semaphore is set (globally) to broken state. * * @param nodeId ID of the failing node. + * @param broken Flag indicating that this semaphore is broken. * @return True if this is the call that succeeded to change the global state. */ - boolean releaseFailedNode(final UUID nodeId) { - protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) { ++ boolean releaseFailedNode(final UUID nodeId, final boolean broken) { try { - return CU.outTx( - retryTopologySafe(new Callable<Boolean>() { + return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try ( GridNearTxLocal tx = CU.txStartInternal(ctx, @@@ -466,10 -506,16 +499,14 @@@ tx.commit(); - return new Sync(cnt, waiters, failoverSafe); + Sync sync = new Sync(cnt, waiters, failoverSafe); + + sync.setBroken(val.isBroken()); + + return sync; } } - }), - ctx - ); + }); if (log.isDebugEnabled()) log.debug("Initialized internal sync structure: " + sync); @@@ -717,12 -787,15 +775,15 @@@ try { initializeSemaphore(); - boolean result = sync.nonfairTryAcquireShared(1) >= 0; + boolean res = sync.nonfairTryAcquireShared(1) >= 0; - if (isBroken()) + if (isBroken()) { + Thread.interrupted(); // Clear interrupt flag. + throw new InterruptedException(); + } - return result; + return res; } catch (IgniteCheckedException e) { throw U.convertException(e);