IGNITE-2236: Removed dedicated listener.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2fe1a0dd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2fe1a0dd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2fe1a0dd Branch: refs/heads/ignite-2236 Commit: 2fe1a0dd95d4eeb0049afc87e7833fe9dec63c32 Parents: 702c4f4 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Dec 23 16:31:25 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Dec 23 16:31:25 2015 +0300 ---------------------------------------------------------------------- .../util/future/GridCompoundFuture.java | 175 +++++++++---------- 1 file changed, 79 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2fe1a0dd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index feabbf5..f15617f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -17,15 +17,11 @@ package org.apache.ignite.internal.util.future; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; @@ -35,10 +31,14 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + /** * Future composed of multiple inner futures. */ -public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { +public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements IgniteInClosure<IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; @@ -56,10 +56,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { /** Futures. */ protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>(); - /** */ - @GridToStringExclude - private final Listener lsnr = new Listener(); - /** Reducer. */ @GridToStringInclude private final IgniteReducer<T, R> rdc; @@ -91,6 +87,61 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { } /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture<T> fut) { + try { + T t = fut.get(); + + try { + if (rdc != null && !rdc.collect(t)) + onDone(rdc.reduce()); + } + catch (RuntimeException e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + // Exception in reducer is a bug, so we bypass checkComplete here. + onDone(e); + } + catch (AssertionError e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + // Bypass checkComplete because need to rethrow. + onDone(e); + + throw e; + } + } + catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | + ClusterTopologyCheckedException e) { + if (!ignoreFailure(e)) + onDone(e); + } + catch (IgniteCheckedException e) { + if (!ignoreFailure(e)) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + onDone(e); + } + } + catch (RuntimeException e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + onDone(e); + } + catch (AssertionError e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + // Bypass checkComplete because need to rethrow. + onDone(e); + + throw e; + } + + lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this); + + checkComplete(); + } + + /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { if (onCancelled()) { for (IgniteInternalFuture<T> fut : futures()) @@ -107,24 +158,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { * * @return Collection of futures. */ - private Collection<IgniteInternalFuture<T>> futures(boolean pending) { - synchronized (futs) { - Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size()); - - for (IgniteInternalFuture<T> fut : futs) { - if (!pending || !fut.isDone()) - res.add(fut); - } - - return res; - } - } - - /** - * Gets collection of futures. - * - * @return Collection of futures. - */ public Collection<IgniteInternalFuture<T>> futures() { return futures(false); } @@ -139,6 +172,24 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { } /** + * Gets collection of futures. + * + * @return Collection of futures. + */ + private Collection<IgniteInternalFuture<T>> futures(boolean pending) { + synchronized (futs) { + Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size()); + + for (IgniteInternalFuture<T> fut : futs) { + if (!pending || !fut.isDone()) + res.add(fut); + } + + return res; + } + } + + /** * Checks if this compound future should ignore this particular exception. * * @param err Exception to check. @@ -190,7 +241,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { futs.add(fut); } - fut.listen(lsnr); + fut.listen(this); if (isCancelled()) { try { @@ -282,72 +333,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { }) ); } - - /** - * Listener for futures. - */ - private class Listener implements IgniteInClosure<IgniteInternalFuture<T>> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void apply(IgniteInternalFuture<T> fut) { - try { - T t = fut.get(); - - try { - if (rdc != null && !rdc.collect(t)) - onDone(rdc.reduce()); - } - catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - // Exception in reducer is a bug, so we bypass checkComplete here. - onDone(e); - } - catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - // Bypass checkComplete because need to rethrow. - onDone(e); - - throw e; - } - } - catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | - ClusterTopologyCheckedException e) { - if (!ignoreFailure(e)) - onDone(e); - } - catch (IgniteCheckedException e) { - if (!ignoreFailure(e)) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - onDone(e); - } - } - catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - onDone(e); - } - catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - // Bypass checkComplete because need to rethrow. - onDone(e); - - throw e; - } - - lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this); - - checkComplete(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "Compound future listener []"; - } - } }