IGNITE-3875: Introduced separate thread pool for data streamer. This closes #1173. This closes #1383.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d82d6a0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d82d6a0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d82d6a0 Branch: refs/heads/ignite-3477 Commit: 7d82d6a06b5e9f1f8cd2909b865e37d46b8da03f Parents: 7e73d02 Author: devozerov <voze...@gridgain.com> Authored: Wed Dec 28 12:58:11 2016 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Dec 28 12:58:11 2016 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 31 ++++++ .../ignite/internal/GridKernalContext.java | 7 ++ .../ignite/internal/GridKernalContextImpl.java | 12 +++ .../apache/ignite/internal/IgniteKernal.java | 3 + .../org/apache/ignite/internal/IgnitionEx.java | 19 ++++ .../managers/communication/GridIoManager.java | 2 + .../managers/communication/GridIoPolicy.java | 3 + .../closure/GridClosureProcessor.java | 2 +- .../datastreamer/DataStreamProcessor.java | 60 ++++++++--- .../datastreamer/DataStreamerImpl.java | 37 ++----- .../internal/processors/pool/PoolProcessor.java | 5 + .../DataStreamProcessorSelfTest.java | 104 +++++++++++++++++++ .../junits/GridTestKernalContext.java | 12 +-- 13 files changed, 249 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index dcd8a80..e0ff9b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -148,6 +148,9 @@ public class IgniteConfiguration { /** Default core size of public thread pool. */ public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT); + /** Default size of data streamer thread pool. */ + public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT; + /** Default keep alive time for public thread pool. */ @Deprecated public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0; @@ -251,6 +254,9 @@ public class IgniteConfiguration { /** IGFS pool size. */ private int igfsPoolSize = AVAILABLE_PROC_CNT; + /** Data stream pool size. */ + private int dataStreamerPoolSize = DFLT_DATA_STREAMER_POOL_SIZE; + /** Utility cache pool size. */ private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -514,6 +520,7 @@ public class IgniteConfiguration { clockSyncFreq = cfg.getClockSyncFrequency(); clockSyncSamples = cfg.getClockSyncSamples(); consistentId = cfg.getConsistentId(); + dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); failureDetectionTimeout = cfg.getFailureDetectionTimeout(); @@ -837,6 +844,17 @@ public class IgniteConfiguration { } /** + * Size of thread pool that is in charge of processing data stream messages. + * <p> + * If not provided, executor service will have size {@link #DFLT_DATA_STREAMER_POOL_SIZE}. + * + * @return Thread pool size to be used for data stream messages. + */ + public int getDataStreamerThreadPoolSize() { + return dataStreamerPoolSize; + } + + /** * Default size of thread pool that is in charge of processing utility cache messages. * <p> * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}. @@ -960,6 +978,19 @@ public class IgniteConfiguration { } /** + * Set thread pool size that will be used to process data stream messages. + * + * @param poolSize Executor service to use for data stream messages. + * @see IgniteConfiguration#getDataStreamerThreadPoolSize() + * @return {@code this} for chaining. + */ + public IgniteConfiguration setDataStreamerThreadPoolSize(int poolSize) { + dataStreamerPoolSize = poolSize; + + return this; + } + + /** * Sets default thread pool size that will be used to process utility cache messages. * * @param poolSize Default executor service size to use for utility cache messages. http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 927944f..9157fed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -542,6 +542,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService getIgfsExecutorService(); /** + * Executor service that is in charge of processing data stream messages. + * + * @return Thread pool implementation to be used for data stream messages. + */ + public ExecutorService getDataStreamerExecutorService(); + + /** * Should return an instance of fully configured thread pool to be used for * processing of client messages (REST requests). * http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index a2ad1b2..8fc5b36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -317,6 +317,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + private ExecutorService dataStreamExecSvc; + + /** */ + @GridToStringExclude protected ExecutorService restExecSvc; /** */ @@ -390,6 +394,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param p2pExecSvc P2P executor service. * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. + * @param dataStreamExecSvc data stream executor service. * @param restExecSvc REST executor service. * @param affExecSvc Affinity executor service. * @param idxExecSvc Indexing executor service. @@ -410,6 +415,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, + ExecutorService dataStreamExecSvc, ExecutorService restExecSvc, ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, @@ -431,6 +437,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.p2pExecSvc = p2pExecSvc; this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; + this.dataStreamExecSvc = dataStreamExecSvc; this.restExecSvc = restExecSvc; this.affExecSvc = affExecSvc; this.idxExecSvc = idxExecSvc; @@ -977,6 +984,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService getDataStreamerExecutorService() { + return dataStreamExecSvc; + } + + /** {@inheritDoc} */ @Override public ExecutorService getRestExecutorService() { return restExecSvc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 4972d1f..99c3dab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -668,6 +668,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param p2pExecSvc P2P executor service. * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. + * @param dataStreamExecSvc data stream executor service. * @param restExecSvc Reset executor service. * @param affExecSvc Affinity executor service. * @param idxExecSvc Indexing executor service. @@ -685,6 +686,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, + ExecutorService dataStreamExecSvc, ExecutorService restExecSvc, ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, @@ -794,6 +796,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { p2pExecSvc, mgmtExecSvc, igfsExecSvc, + dataStreamExecSvc, restExecSvc, affExecSvc, idxExecSvc, http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index f32a753..9fe6fd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1472,6 +1472,9 @@ public class IgnitionEx { /** IGFS executor service. */ private ThreadPoolExecutor igfsExecSvc; + /** Data streamer executor service. */ + private ThreadPoolExecutor dataStreamerExecSvc; + /** REST requests executor service. */ private ThreadPoolExecutor restExecSvc; @@ -1702,6 +1705,17 @@ public class IgnitionEx { p2pExecSvc.allowCoreThreadTimeOut(true); + // Note that we do not pre-start threads here as this pool may not be needed. + dataStreamerExecSvc = new IgniteThreadPoolExecutor( + "data-streamer", + cfg.getGridName(), + cfg.getDataStreamerThreadPoolSize(), + cfg.getDataStreamerThreadPoolSize(), + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>()); + + dataStreamerExecSvc.allowCoreThreadTimeOut(true); + // Note that we do not pre-start threads here as igfs pool may not be needed. validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS"); @@ -1806,6 +1820,7 @@ public class IgnitionEx { p2pExecSvc, mgmtExecSvc, igfsExecSvc, + dataStreamerExecSvc, restExecSvc, affExecSvc, idxExecSvc, @@ -2445,6 +2460,10 @@ public class IgnitionEx { p2pExecSvc = null; + U.shutdownNow(getClass(), dataStreamerExecSvc, log); + + dataStreamerExecSvc = null; + U.shutdownNow(getClass(), igfsExecSvc, log); igfsExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 7ef7bc0..de34adb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -90,6 +90,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; @@ -686,6 +687,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case MARSH_CACHE_POOL: case IDX_POOL: case IGFS_POOL: + case DATA_STREAMER_POOL: { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 70a7354..18235d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -49,6 +49,9 @@ public class GridIoPolicy { /** Pool for handling distributed index range requests. */ public static final byte IDX_POOL = 8; + /** Data streamer execution pool. */ + public static final byte DATA_STREAMER_POOL = 9; + /** * Defines the range of reserved pools that are not available for plugins. * @param key The key. http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index a07dbf8..5ba21d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -988,7 +988,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) { + public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) { try { return callLocal(c, plc); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index fee4dd6..5ebfd47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -17,11 +17,8 @@ package org.apache.ignite.internal.processors.datastreamer; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.DelayQueue; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -40,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.marshaller.Marshaller; @@ -47,11 +45,15 @@ import org.apache.ignite.stream.StreamReceiver; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.DelayQueue; + import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL; /** - * + * Data stream processor. */ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { /** Loaders map (access is not supposed to be highly concurrent). */ @@ -224,13 +226,15 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer); if (fut != null && !fut.isDone()) { + final byte plc = threadIoPolicy(); + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { processRequest(nodeId, req); } - }, false); + }, plc); } }); @@ -416,12 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); try { - Byte plc = GridIoManager.currentPolicy(); - - if (plc == null) - plc = PUBLIC_POOL; - - ctx.io().send(nodeId, resTopic, res, plc); + ctx.io().send(nodeId, resTopic, res, threadIoPolicy()); } catch (IgniteCheckedException e) { if (ctx.discovery().alive(nodeId)) @@ -431,6 +430,41 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { } } + /** + * Get IO policy. + * + * @return IO policy. + */ + private static byte threadIoPolicy() { + Byte plc = GridIoManager.currentPolicy(); + + if (plc == null) + plc = DATA_STREAMER_POOL; + + return plc; + } + + /** + * Get IO policy for particular node with provided resolver. + * + * @param rslvr Resolver. + * @param node Node. + * @return IO policy. + */ + public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode node) { + assert node != null; + + Byte res = null; + + if (rslvr != null) + res = rslvr.apply(node); + + if (res == null) + res = DATA_STREAMER_POOL; + + return res; + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index bb9ffdd..0526162 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -61,7 +61,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -112,16 +111,12 @@ import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; /** * Data streamer implementation. */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { - /** Default policy resolver. */ - private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver(); - /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); @@ -135,7 +130,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private byte[] updaterBytes; /** IO policy resovler for data load request. */ - private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR; + private IgniteClosure<ClusterNode, Byte> ioPlcRslvr; /** Max remap count before issuing an error. */ private static final int DFLT_MAX_REMAP_CNT = 32; @@ -1509,10 +1504,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @param entries Entries. * @param reqTopVer Request topology version. * @param curFut Current future. + * @param plc Policy. */ private void localUpdate(final Collection<DataStreamerEntry> entries, final AffinityTopologyVersion reqTopVer, - final GridFutureAdapter<Object> curFut) { + final GridFutureAdapter<Object> curFut, + final byte plc) { try { GridCacheContext cctx = ctx.cache().internalCache(cacheName).context(); @@ -1543,7 +1540,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed skipStore, keepBinary, rcvr), - false); + plc); locFuts.add(callFut); @@ -1573,7 +1570,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed else { fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { - localUpdate(entries, reqTopVer, curFut); + localUpdate(entries, reqTopVer, curFut, plc); } }); } @@ -1617,13 +1614,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed IgniteInternalFuture<Object> fut; - Byte plc = ioPlcRslvr.apply(node); - - if (plc == null) - plc = PUBLIC_POOL; + byte plc = DataStreamProcessor.ioPolicy(ioPlcRslvr, node); - if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) - localUpdate(entries, topVer, curFut); + if (isLocNode) + localUpdate(entries, topVer, curFut, plc); else { try { for (DataStreamerEntry e : entries) { @@ -1975,19 +1969,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** - * Default IO policy resolver. - */ - private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Byte apply(ClusterNode gridNode) { - return PUBLIC_POOL; - } - } - - /** * Key object wrapper. Using identity equals prevents slow down in case of hash code collision. */ private static class KeyCacheObjectWrapper { http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index 59e5e7d..89140b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -128,6 +128,11 @@ public class PoolProcessor extends GridProcessorAdapter { return ctx.getIgfsExecutorService(); + case GridIoPolicy.DATA_STREAMER_POOL: + assert ctx.getDataStreamerExecutorService() != null : "Data streamer pool is not configured."; + + return ctx.getDataStreamerExecutorService(); + default: { if (plc < 0) throw new IgniteCheckedException("Policy cannot be negative: " + plc); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index 0f8ae29..d00e08b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -33,6 +33,7 @@ import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; @@ -49,6 +50,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; @@ -59,6 +61,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.stream.StreamReceiver; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -949,6 +952,94 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testLocalDataStreamerDedicatedThreadPool() throws Exception { + try { + useCache = true; + + Ignite ignite = startGrid(1); + + final IgniteCache<String, String> cache = ignite.cache(null); + + IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(null); + try { + ldr.receiver(new StreamReceiver<String, String>() { + @Override public void receive(IgniteCache<String, String> cache, + Collection<Map.Entry<String, String>> entries) throws IgniteException { + String threadName = Thread.currentThread().getName(); + + cache.put("key", threadName); + } + }); + ldr.addData("key", "value"); + + ldr.tryFlush(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cache.get("key") != null; + } + }, 3_000); + } + finally { + ldr.close(true); + } + + assertNotNull(cache.get("key")); + + assertTrue(cache.get("key").startsWith("data-streamer")); + + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoteDataStreamerDedicatedThreadPool() throws Exception { + try { + useCache = true; + + Ignite ignite = startGrid(1); + + useCache = false; + + Ignite client = startGrid(0); + + final IgniteCache<String, String> cache = ignite.cache(null); + + IgniteDataStreamer<String, String> ldr = client.dataStreamer(null); + + try { + ldr.receiver(new StringStringStreamReceiver()); + + ldr.addData("key", "value"); + + ldr.tryFlush(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cache.get("key") != null; + } + }, 3_000); + } + finally { + ldr.close(true); + } + + assertNotNull(cache.get("key")); + + assertTrue(cache.get("key").startsWith("data-streamer")); + } + finally { + stopAllGrids(); + } + } + + /** * */ public static class TestObject { @@ -1024,4 +1115,17 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } } } + + /** + * + */ + private static class StringStringStreamReceiver implements StreamReceiver<String, String> { + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<String, String> cache, + Collection<Map.Entry<String, String>> entries) throws IgniteException { + String threadName = Thread.currentThread().getName(); + + cache.put("key", threadName); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index 143159d..40f0e43 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -64,6 +64,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, U.allPluginProviders() ); @@ -98,11 +99,6 @@ public class GridTestKernalContext extends GridKernalContextImpl { } } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTestKernalContext.class, this, super.toString()); - } - /** * Sets system executor service. * @@ -112,7 +108,6 @@ public class GridTestKernalContext extends GridKernalContextImpl { this.sysExecSvc = sysExecSvc; } - /** * Sets executor service. * @@ -121,4 +116,9 @@ public class GridTestKernalContext extends GridKernalContextImpl { public void setExecutorService(ExecutorService execSvc){ this.execSvc = execSvc; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridTestKernalContext.class, this, super.toString()); + } }