IGNITE-1678 rework reservation, add reservation context
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0415064f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0415064f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0415064f Branch: refs/heads/ignite-1678 Commit: 0415064f4dbbfb3bea6d359467af1525af235b68 Parents: cf9a98f Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Tue Aug 28 11:36:57 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Tue Aug 28 11:36:57 2018 +0300 ---------------------------------------------------------------------- .../GridCacheAtomicSequenceImpl.java | 302 ++++++++++--------- 1 file changed, 154 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0415064f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index ba75bfa..70d51ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -74,27 +74,12 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< /** Reservation percentage. */ private volatile int reservePercentage; - /** Reserved bottom bound of local counter (included). */ - private volatile long reservedBottomBound; - - /** Reserved upper bound of local counter (not included). */ - private volatile long reservedUpBound; - - /** A limit after which a new reservation should be done. */ - private volatile long newReservationLine; - - /** Reservation future. */ - private volatile IgniteInternalFuture<?> reservationFut; - - /** Reservation pool. */ - private final byte poolPlc = GridIoPolicy.SYSTEM_POOL; + /** */ + private ReservationBoundsContext reservationCtx; /** Synchronization lock for local value updates. */ private final Lock localUpdateLock = new ReentrantLock(); - /** */ - private final Callable<Void> reserveCallableWithZeroOffset = reserveCallable(0); - /** * Empty constructor required by {@link Externalizable}. */ @@ -131,10 +116,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< this.upBound = upBound; this.locVal = locVal; - reservedBottomBound = locVal; - reservedUpBound = upBound; - // Calculate next reservation bound. - newReservationLine = calculateNewReservationLine(locVal); + reservationCtx = new ReservationBoundsContext(new ReservationBoundsResult(locVal, upBound)); } /** {@inheritDoc} */ @@ -200,80 +182,30 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< private long internalUpdate(long l, boolean updated) throws IgniteCheckedException { assert l > 0; - while (true){ + while (true) { checkRemoved(); localUpdateLock.lock(); try { - IgniteInternalFuture<?> reservation = reservationFut; - - boolean reservationInProgress = reservation != null; - - long newLocalVal = locVal + l; - - // Reserve new interval if operation is not in progress. - if (newLocalVal >= newReservationLine && newLocalVal <= reservedUpBound && !reservationInProgress) { - reservationFut = reservation = runAsyncReservation(0); - - reservationInProgress = true; - } - long locVal0 = locVal; + long newLocalVal = reservationCtx.calculateNewLocalValue(locVal0, l); + if (newLocalVal <= upBound) { locVal = newLocalVal; return updated ? newLocalVal : locVal0; } - - // Await complete previous reservation. - if (reservationInProgress){ - reservation.get(); - - reservationFut = null; - - // Retry check bounds. - continue; - } - - // Still in reserved interval. - if (newLocalVal < reservedUpBound) { - long curVal = locVal; - - if (newLocalVal < reservedBottomBound) - locVal = reservedBottomBound; - else - locVal += l; - - upBound = reservedUpBound; - - return updated ? locVal : curVal; - } - // Switched to the next interval. New value more that upper reserved bound. - else { - assert !reservationInProgress; - - long diff = newLocalVal - reservedUpBound; - - // Calculate how many batch size included in l. - // It will our offset for global seq counter. - long off = (diff / batchSize) * batchSize; - - reservationFut = reservation = runAsyncReservation(off); - - // Can not wait async, should wait under lock until new interval reserved. - reservation.get(); - - reservationFut = null; - } } finally { localUpdateLock.unlock(); } + + // Await not under the lock. + reservationCtx.awaitCompleteReservation(); } } - /** {@inheritDoc} */ @Override public int batchSize() { return batchSize; @@ -331,77 +263,6 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< } } - /** - * Runs async reservation of new range for current node. - * - * @param off Offset. - * @return Future. - */ - private IgniteInternalFuture<?> runAsyncReservation(final long off) { - assert off >= 0 : off; - - GridFutureAdapter<?> resFut = new GridFutureAdapter<>(); - - resFut.listen(f -> { - if (f.error() == null) - reservationFut = null; // Reset null only if there was not an error. - }); - - ctx.kernalContext().closure().runLocalSafe(() -> { - Callable<Void> reserveCall = off == 0 ? reserveCallableWithZeroOffset: reserveCallable(off); - - try { - CU.retryTopologySafe(reserveCall); - - resFut.onDone(); - } - catch (Throwable h) { - resFut.onDone(h); - } - }, poolPlc); - - return resFut; - } - - /** - * @param off Reservation offset. - * @return Callable for reserved new interval. - */ - private Callable<Void> reserveCallable(long off){ - return new Callable<Void>() { - @Override public Void call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicSequenceValue seq = cacheView.get(key); - - checkRemoved(); - - assert seq != null; - - long curGlobalVal = seq.get(); - - reservedBottomBound = curGlobalVal + off; - - reservedUpBound = reservedBottomBound + (batchSize > 1 ? batchSize - 1 : 1); - - newReservationLine = calculateNewReservationLine(reservedBottomBound); - - seq.set(reservedUpBound + 1); - - cacheView.put(key, seq); - - tx.commit(); - } - catch (Error | Exception e) { - if(!X.hasCause(e, ClusterTopologyCheckedException.class)) - U.error(log, "Failed to get and add: " + this, e); - - throw e; - } - - return null; - } - }; - } /** * @return New reservation line. @@ -448,4 +309,149 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy< @Override public String toString() { return S.toString(GridCacheAtomicSequenceImpl.class, this); } + + /** + * + */ + private class ReservationBoundsResult { + /** Reserved bottom bound of local counter (included). */ + private final long reservedBottomBound; + + /** Reserved upper bound of local counter (not included). */ + private final long reservedUpBound; + + /** A limit after which a new reservation should be done. */ + private final long newReservationLine; + + private ReservationBoundsResult(long reservedBottomBound, long reservedUpBound) { + this.reservedBottomBound = reservedBottomBound; + this.reservedUpBound = reservedUpBound; + this.newReservationLine = calculateNewReservationLine(reservedBottomBound); + } + } + + private class ReservationBoundsContext { + + /** Reservation pool. */ + private final byte poolPlc = GridIoPolicy.SYSTEM_POOL; + + private ReservationBoundsResult bounds; + + /** Reservation future. */ + private IgniteInternalFuture<ReservationBoundsResult> reservationFut; + + /** */ + private final Callable<ReservationBoundsResult> reserveCallableWithZeroOffset = reserveCallable(0); + + private ReservationBoundsContext(ReservationBoundsResult bounds) { + this.bounds = bounds; + } + + void awaitCompleteReservation() throws IgniteCheckedException { + /* IgniteInternalFuture<ReservationBoundsResult> fut = reservationFut; + + if (fut != null) + fut.get();*/ + } + + long calculateNewLocalValue(long current, long delta) throws IgniteCheckedException { + long newVal = current + delta; + + long offset = 0; + + if (reservationFut == null) { + // Reserve new interval if operation is not in progress. + if (newVal >= bounds.newReservationLine && newVal <= bounds.reservedUpBound) + reservationFut = runAsyncReservation(offset); + else if (newVal > bounds.reservedUpBound) { + offset = newVal - bounds.reservedUpBound - 1; + + reservationFut = runAsyncReservation(offset); + } + } + + if (newVal > upBound) { + bounds = reservationFut.get(); + + if (upBound < bounds.reservedBottomBound) { + long diff = delta - (upBound - current) - 1; + + newVal = bounds.reservedBottomBound + diff - offset; + } + + upBound = bounds.reservedUpBound; + + reservationFut = null; + } + + return newVal; + } + + /** + * Runs async reservation of new range for current node. + * + * @param off Offset. + * @return Future. + */ + private IgniteInternalFuture<ReservationBoundsResult> runAsyncReservation(final long off) { + assert off >= 0 : off; + + GridFutureAdapter<ReservationBoundsResult> resFut = new GridFutureAdapter<>(); + + ctx.kernalContext().closure().runLocalSafe(() -> { + Callable<ReservationBoundsResult> reserveCall = off == 0 + ? reserveCallableWithZeroOffset: reserveCallable(off); + + try { + resFut.onDone(CU.retryTopologySafe(reserveCall)); + } + catch (Throwable h) { + resFut.onDone(h); + } + }, poolPlc); + + return resFut; + } + + /** + * @param off Reservation offset. + * @return Callable for reserved new interval. + */ + private Callable<ReservationBoundsResult> reserveCallable(long off){ + return new Callable<ReservationBoundsResult>() { + @Override public ReservationBoundsResult call() throws Exception { + long bottomBound; + long upBound; + + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicSequenceValue seq = cacheView.get(key); + + checkRemoved(); + + assert seq != null; + + long curGlobalVal = seq.get(); + + bottomBound = curGlobalVal + off; + + upBound = bottomBound + (batchSize > 1 ? batchSize - 1 : 1); + + seq.set(upBound + 1); + + cacheView.put(key, seq); + + tx.commit(); + } + catch (Error | Exception e) { + if (!X.hasCause(e, ClusterTopologyCheckedException.class)) + U.error(log, "Failed to get and add: " + this, e); + + throw e; + } + + return new ReservationBoundsResult(bottomBound, upBound); + } + }; + } + } }