IGNITE-425: Implementation of ContinuousQueryWithTransformer
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a83f3038 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a83f3038 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a83f3038 Branch: refs/heads/ignite-7485-2 Commit: a83f3038a9424a5eb826cf35da8d22496a551b50 Parents: c917327 Author: Nikolay Izhikov <nizhi...@apache.org> Authored: Thu Feb 8 14:51:32 2018 +0300 Committer: Nikolay Izhikov <nizhi...@apache.org> Committed: Thu Feb 8 14:51:32 2018 +0300 ---------------------------------------------------------------------- .../cache/query/AbstractContinuousQuery.java | 202 ++++++++ .../ignite/cache/query/ContinuousQuery.java | 172 +------ .../query/ContinuousQueryWithTransformer.java | 192 +++++++ .../processors/cache/IgniteCacheProxyImpl.java | 82 ++- .../continuous/CacheContinuousQueryEntry.java | 9 +- .../continuous/CacheContinuousQueryHandler.java | 149 +++++- .../CacheContinuousQueryHandlerV2.java | 9 +- .../CacheContinuousQueryHandlerV3.java | 185 +++++++ .../continuous/CacheContinuousQueryManager.java | 44 +- ...acheContinuousQueryRandomOperationsTest.java | 156 ++++-- ...ContinuousWithTransformerClientSelfTest.java | 40 ++ ...heContinuousWithTransformerFailoverTest.java | 309 +++++++++++ ...eContinuousWithTransformerLocalSelfTest.java | 29 ++ ...nuousWithTransformerPartitionedSelfTest.java | 29 ++ ...uousWithTransformerRandomOperationsTest.java | 31 ++ ...inuousWithTransformerReplicatedSelfTest.java | 511 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 13 + 17 files changed, 1919 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java new file mode 100644 index 0000000..2a615ee --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.EventType; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteAsyncCallback; + +/** + * Base class for continuous query. + * + * @see ContinuousQuery + * @see ContinuousQueryWithTransformer + */ +public abstract class AbstractContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { + /** + * Default page size. Size of {@code 1} means that all entries + * will be sent to master node immediately (buffering is disabled). + */ + public static final int DFLT_PAGE_SIZE = 1; + + /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ + public static final long DFLT_TIME_INTERVAL = 0; + + /** + * Default value for automatic unsubscription flag. Remote filters + * will be unregistered by default if master node leaves topology. + */ + public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; + + /** Initial query. */ + private Query<Cache.Entry<K, V>> initQry; + + /** Remote filter factory. */ + private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory; + + /** Time interval. */ + private long timeInterval = DFLT_TIME_INTERVAL; + + /** Automatic unsubscription flag. */ + private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; + + /** Whether to notify about {@link EventType#EXPIRED} events. */ + private boolean includeExpired; + + /** + * Sets initial query. + * <p> + * This query will be executed before continuous listener is registered + * which allows to iterate through entries which already existed at the + * time continuous query is executed. + * + * @param initQry Initial query. + * @return {@code this} for chaining. + */ + public AbstractContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> initQry) { + this.initQry = initQry; + + return this; + } + + /** + * Gets initial query. + * + * @return Initial query. + */ + public Query<Cache.Entry<K, V>> getInitialQuery() { + return initQry; + } + + /** + * Sets optional key-value filter factory. This factory produces filter is called before entry is + * sent to the master node. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking + * (e.g., synchronization or transactional cache operations), should be executed asynchronously + * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * <p> + * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback + * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. + * + * @param rmtFilterFactory Key-value filter factory. + * @return {@code this} for chaining. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() + */ + public AbstractContinuousQuery<K, V> setRemoteFilterFactory( + Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) { + this.rmtFilterFactory = rmtFilterFactory; + + return this; + } + + /** + * Gets remote filter. + * + * @return Remote filter. + */ + public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() { + return rmtFilterFactory; + } + + /** + * Sets time interval. + * <p> + * When a cache update happens, entry is first put into a buffer. Entries from buffer will + * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)} + * method) or time provided via this method is exceeded. + * <p> + * Default time interval is {@code 0} which means that + * time check is disabled and entries will be sent only when buffer is full. + * + * @param timeInterval Time interval. + * @return {@code this} for chaining. + */ + public AbstractContinuousQuery<K, V> setTimeInterval(long timeInterval) { + if (timeInterval < 0) + throw new IllegalArgumentException("Time interval can't be negative."); + + this.timeInterval = timeInterval; + + return this; + } + + /** + * Gets time interval. + * + * @return Time interval. + */ + public long getTimeInterval() { + return timeInterval; + } + + /** + * Sets automatic unsubscribe flag. + * <p> + * This flag indicates that query filters on remote nodes should be + * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is + * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be + * unregistered if master node leaves grid. + * <p> + * Default value for this flag is {@code true}. + * + * @param autoUnsubscribe Automatic unsubscription flag. + * @return {@code this} for chaining. + */ + public AbstractContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + + return this; + } + + /** + * Gets automatic unsubscription flag value. + * + * @return Automatic unsubscription flag. + */ + public boolean isAutoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events. + * If {@code true}, then the remote listener will get notifications about entries + * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED} + * and {@link EventType#REMOVED} events will be fired in the remote listener. + * <p> + * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled. + * + * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events. + */ + public void setIncludeExpired(boolean includeExpired) { + this.includeExpired = includeExpired; + } + + /** + * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events. + * + * @return Whether to notify about {@link EventType#EXPIRED} events. + */ + public boolean isIncludeExpired() { + return includeExpired; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 49d471e..9a8fbca 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -21,9 +21,9 @@ import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.event.EventType; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncCallback; @@ -103,49 +103,20 @@ import org.apache.ignite.lang.IgniteAsyncCallback; * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) * and notification order is kept the same as update order for given cache key. * + * @see ContinuousQueryWithTransformer * @see IgniteAsyncCallback * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ -public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { +public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> { /** */ private static final long serialVersionUID = 0L; - /** - * Default page size. Size of {@code 1} means that all entries - * will be sent to master node immediately (buffering is disabled). - */ - public static final int DFLT_PAGE_SIZE = 1; - - /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ - public static final long DFLT_TIME_INTERVAL = 0; - - /** - * Default value for automatic unsubscription flag. Remote filters - * will be unregistered by default if master node leaves topology. - */ - public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; - - /** Initial query. */ - private Query<Cache.Entry<K, V>> initQry; - /** Local listener. */ private CacheEntryUpdatedListener<K, V> locLsnr; /** Remote filter. */ private CacheEntryEventSerializableFilter<K, V> rmtFilter; - /** Remote filter factory. */ - private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory; - - /** Time interval. */ - private long timeInterval = DFLT_TIME_INTERVAL; - - /** Automatic unsubscription flag. */ - private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; - - /** Whether to notify about {@link EventType#EXPIRED} events. */ - private boolean includeExpired; - /** * Creates new continuous query. */ @@ -153,29 +124,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { setPageSize(DFLT_PAGE_SIZE); } - /** - * Sets initial query. - * <p> - * This query will be executed before continuous listener is registered - * which allows to iterate through entries which already existed at the - * time continuous query is executed. - * - * @param initQry Initial query. - * @return {@code this} for chaining. - */ + /** {@inheritDoc} */ public ContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> initQry) { - this.initQry = initQry; - - return this; - } - - /** - * Gets initial query. - * - * @return Initial query. - */ - public Query<Cache.Entry<K, V>> getInitialQuery() { - return initQry; + return (ContinuousQuery<K, V>)super.setInitialQuery(initQry); } /** @@ -197,6 +148,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * @return {@code this} for chaining. * @see IgniteAsyncCallback * @see IgniteConfiguration#getAsyncCallbackPoolSize() + * @see ContinuousQueryWithTransformer#setLocalListener(EventListener) */ public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { this.locLsnr = locLsnr; @@ -246,118 +198,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { return rmtFilter; } - /** - * Sets optional key-value filter factory. This factory produces filter is called before entry is - * sent to the master node. - * <p> - * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking - * (e.g., synchronization or transactional cache operations), should be executed asynchronously - * without blocking the thread that called the filter. Otherwise, you can get deadlocks. - * <p> - * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback - * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. - * - * @param rmtFilterFactory Key-value filter factory. - * @return {@code this} for chaining. - * @see IgniteAsyncCallback - * @see IgniteConfiguration#getAsyncCallbackPoolSize() - */ - public ContinuousQuery<K, V> setRemoteFilterFactory( - Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) { - this.rmtFilterFactory = rmtFilterFactory; - - return this; - } - - /** - * Gets remote filter. - * - * @return Remote filter. - */ - public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() { - return rmtFilterFactory; - } - - /** - * Sets time interval. - * <p> - * When a cache update happens, entry is first put into a buffer. Entries from buffer will - * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)} - * method) or time provided via this method is exceeded. - * <p> - * Default time interval is {@code 0} which means that - * time check is disabled and entries will be sent only when buffer is full. - * - * @param timeInterval Time interval. - * @return {@code this} for chaining. - */ + /** {@inheritDoc} */ public ContinuousQuery<K, V> setTimeInterval(long timeInterval) { - if (timeInterval < 0) - throw new IllegalArgumentException("Time interval can't be negative."); - - this.timeInterval = timeInterval; - - return this; + return (ContinuousQuery<K, V>)super.setTimeInterval(timeInterval); } - /** - * Gets time interval. - * - * @return Time interval. - */ - public long getTimeInterval() { - return timeInterval; - } - - /** - * Sets automatic unsubscribe flag. - * <p> - * This flag indicates that query filters on remote nodes should be - * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is - * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be - * unregistered if master node leaves grid. - * <p> - * Default value for this flag is {@code true}. - * - * @param autoUnsubscribe Automatic unsubscription flag. - * @return {@code this} for chaining. - */ + /** {@inheritDoc} */ public ContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) { - this.autoUnsubscribe = autoUnsubscribe; - - return this; - } - - /** - * Gets automatic unsubscription flag value. - * - * @return Automatic unsubscription flag. - */ - public boolean isAutoUnsubscribe() { - return autoUnsubscribe; - } - - /** - * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events. - * If {@code true}, then the remote listener will get notifications about entries - * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED} - * and {@link EventType#REMOVED} events will be fired in the remote listener. - * <p> - * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled. - * - * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events. - */ - public void setIncludeExpired(boolean includeExpired) { - this.includeExpired = includeExpired; - } - - /** - * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events. - * - * @return Whether to notify about {@link EventType#EXPIRED} events. - */ - public boolean isIncludeExpired() { - return includeExpired; + return (ContinuousQuery<K, V>)super.setAutoUnsubscribe(autoUnsubscribe); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java new file mode 100644 index 0000000..122410f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteClosure; + +/** + * API for configuring continuous cache queries with transformer. + * <p> + * Continuous queries allow to register a remote filter and a local listener + * for cache updates. If an update event passes the filter, it will be transformed with transformer and sent to + * the node that executed the query and local listener will be notified. + * <p> + * Additionally, you can execute initial query to get currently existing data. + * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)} + * method. + * <p> + * Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)} + * method, or only on the local node, if {@link Query#setLocal(boolean)} parameter is set to {@code true}. + * Note that in case query is distributed and a new node joins, it will get the remote + * filter for the query during discovery process before it actually joins topology, + * so no updates will be missed. + * This will execute query on all nodes that have cache you are working with and + * listener will start to receive notifications for cache updates. + * <p> + * To stop receiving updates call {@link QueryCursor#close()} method. + * Note that this works even if you didn't provide initial query. Cursor will + * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()} + * is called. + * <p> + * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter} + * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener} + * (see {@link #setRemoteTransformerFactory(Factory)}) and {@link CacheEntryUpdatedListener} + * (see {@link #setLocalListener(EventListener)} and {@link EventListener}). + * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback + * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) + * and notification order is kept the same as update order for given cache key. + * + * @see ContinuousQuery + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() + */ +public final class ContinuousQueryWithTransformer<K, V, T> extends AbstractContinuousQuery<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Remote transformer factory. */ + private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, T>> rmtTransFactory; + + /** Local listener of transformed event */ + private EventListener<T> locLsnr; + + /** + * Creates new continuous query with transformer. + */ + public ContinuousQueryWithTransformer() { + setPageSize(DFLT_PAGE_SIZE); + } + + /** {@inheritDoc} */ + public ContinuousQueryWithTransformer<K, V, T> setInitialQuery(Query<Cache.Entry<K, V>> initQry) { + return (ContinuousQueryWithTransformer<K, V, T>)super.setInitialQuery(initQry); + } + + /** {@inheritDoc} */ + public ContinuousQueryWithTransformer<K, V, T> setRemoteFilterFactory( + Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) { + return (ContinuousQueryWithTransformer<K, V, T>)super.setRemoteFilterFactory(rmtFilterFactory); + } + + /** + * Sets transformer factory. This factory produces transformer is called after and only if entry passes the filter. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking + * (e.g., synchronization or transactional cache operations), should be executed asynchronously + * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * <p> + * + * @param factory Remote transformer factory. + * @return {@code this} for chaining. + */ + public ContinuousQueryWithTransformer<K, V, T> setRemoteTransformerFactory( + Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, T>> factory) { + this.rmtTransFactory = factory; + + return this; + } + + /** + * Gets remote transformer factory + * + * @return Remote Transformer Factory + */ + public Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, T>> getRemoteTransformerFactory() { + return rmtTransFactory; + } + + /** + * Sets local callback. This callback is called only in local node when new updates are received. + * <p> + * The callback predicate accepts results of transformed by {@link #getRemoteFilterFactory()} events + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g., + * synchronization or transactional cache operations), should be executed asynchronously without + * blocking the thread that called the callback. Otherwise, you can get deadlocks. + * <p> + * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool + * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. + * + * @param locLsnr Local callback. + * @return {@code this} for chaining. + * + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() + * @see ContinuousQuery#setLocalListener(CacheEntryUpdatedListener) + */ + public ContinuousQueryWithTransformer<K, V, T> setLocalListener(EventListener<T> locLsnr) { + this.locLsnr = locLsnr; + + return this; + } + + /** + * Gets local transformed event listener + * + * @return local transformed event listener + */ + public EventListener<T> getLocalListener() { + return locLsnr; + } + + /** {@inheritDoc} */ + public ContinuousQueryWithTransformer<K, V, T> setTimeInterval(long timeInterval) { + return (ContinuousQueryWithTransformer<K, V, T>)super.setTimeInterval(timeInterval); + } + + /** {@inheritDoc} */ + public ContinuousQueryWithTransformer<K, V, T> setAutoUnsubscribe(boolean autoUnsubscribe) { + return (ContinuousQueryWithTransformer<K, V, T>)super.setAutoUnsubscribe(autoUnsubscribe); + } + + /** {@inheritDoc} */ + @Override public ContinuousQueryWithTransformer<K, V, T> setPageSize(int pageSize) { + return (ContinuousQueryWithTransformer<K, V, T>)super.setPageSize(pageSize); + } + + /** {@inheritDoc} */ + @Override public ContinuousQueryWithTransformer<K, V, T> setLocal(boolean loc) { + return (ContinuousQueryWithTransformer<K, V, T>)super.setLocal(loc); + } + + /** + * Interface for local listener of {@link ContinuousQueryWithTransformer} to implement. + * Invoked if an cache entry is updated, created or if a batch call is made, + * after the entries are updated and transformed. + * + * @param <T> type of data produced by transformer {@link ContinuousQueryWithTransformer#getRemoteTransformerFactory()}. + * @see ContinuousQueryWithTransformer + * @see ContinuousQueryWithTransformer#setLocalListener(EventListener) + */ + public interface EventListener<T> { + /** + * Called after one or more entries have been updated. + * + * @param events The entries just updated that transformed with remote transformer of {@link ContinuousQueryWithTransformer}. + */ + void onUpdated(Iterable<? extends T> events); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 7f71c74..a834022 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -36,6 +36,8 @@ import javax.cache.Cache; import javax.cache.CacheException; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Configuration; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.expiry.ExpiryPolicy; import javax.cache.integration.CompletionListener; import javax.cache.processor.EntryProcessor; @@ -46,11 +48,15 @@ import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.AbstractContinuousQuery; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryCursor; @@ -62,6 +68,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.TextQuery; import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.IgniteEx; @@ -92,6 +99,7 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; import org.jetbrains.annotations.NotNull; @@ -106,6 +114,12 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** */ private static final long serialVersionUID = 0L; + /** + * Ignite version that introduce {@link ContinuousQueryWithTransformer} feature. + */ + private static final IgniteProductVersion CONT_QRY_WITH_TRANSFORMER_SINCE = + IgniteProductVersion.fromString("2.5.0"); + /** Context. */ private volatile GridCacheContext<K, V> ctx; @@ -498,22 +512,66 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Initial iteration cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) { - if (qry.getInitialQuery() instanceof ContinuousQuery) + private QueryCursor<Cache.Entry<K, V>> queryContinuous(AbstractContinuousQuery qry, boolean loc, boolean keepBinary) { + assert qry instanceof ContinuousQuery || qry instanceof ContinuousQueryWithTransformer; + + if (qry.getInitialQuery() instanceof ContinuousQuery || + qry.getInitialQuery() instanceof ContinuousQueryWithTransformer) { throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + "continuous query. Use SCAN or SQL query for initial iteration."); + } + + CacheEntryUpdatedListener locLsnr = null; + + EventListener locTransLsnr = null; + + CacheEntryEventSerializableFilter rmtFilter = null; + + Factory<? extends IgniteClosure> rmtTransFactory = null; + + if (qry instanceof ContinuousQuery) { + ContinuousQuery<K, V> qry0 = (ContinuousQuery<K, V>)qry; + + if (qry0.getLocalListener() == null) + throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + + if (qry0.getRemoteFilter() != null && qry0.getRemoteFilterFactory() != null) + throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory."); - if (qry.getLocalListener() == null) - throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + locLsnr = qry0.getLocalListener(); - if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null) - throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory."); + rmtFilter = qry0.getRemoteFilter(); + } + else { + ContinuousQueryWithTransformer<K, V, ?> qry0 = (ContinuousQueryWithTransformer<K, V, ?>)qry; + + if (qry0.getLocalListener() == null) + throw new IgniteException("Mandatory local transformed event listener is not set for the query: " + qry); + + if (qry0.getRemoteTransformerFactory() == null) + throw new IgniteException("Mandatory RemoteTransformerFactory is not set for the query: " + qry); + + Collection<ClusterNode> nodes = context().grid().cluster().nodes(); + + for (ClusterNode node : nodes) { + if (node.version().compareTo(CONT_QRY_WITH_TRANSFORMER_SINCE) < 0) { + throw new IgniteException("Can't start ContinuousQueryWithTransformer, " + + "because some nodes in cluster doesn't support this feature: " + node); + } + } + + locTransLsnr = qry0.getLocalListener(); + + rmtTransFactory = qry0.getRemoteTransformerFactory(); + } try { final UUID routineId = ctx.continuousQueries().executeQuery( - qry.getLocalListener(), - qry.getRemoteFilter(), + locLsnr, + locTransLsnr, + rmtFilter, qry.getRemoteFilterFactory(), + rmtTransFactory, qry.getPageSize(), qry.getTimeInterval(), qry.isAutoUnsubscribe(), @@ -596,8 +654,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary(); - if (qry instanceof ContinuousQuery) - return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(), keepBinary); + if (qry instanceof ContinuousQuery || qry instanceof ContinuousQueryWithTransformer) + return (QueryCursor<R>)queryContinuous((AbstractContinuousQuery)qry, qry.isLocal(), keepBinary); if (qry instanceof SqlQuery) return (QueryCursor<R>)ctx.kernalContext().query().querySql(ctx, (SqlQuery)qry, keepBinary); @@ -688,8 +746,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< */ private void validate(Query qry) { if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && - !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) && - !(qry instanceof SqlFieldsQuery)) + !(qry instanceof ContinuousQuery) && !(qry instanceof ContinuousQueryWithTransformer) && + !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) && !(qry instanceof SqlFieldsQuery)) throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() + ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable."); http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 7e3f0b5..88005d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -282,9 +281,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @throws IgniteCheckedException In case of error. */ void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException { - assert key != null; - - key.prepareMarshal(cctx.cacheObjectContext()); + if (key != null) + key.prepareMarshal(cctx.cacheObjectContext()); if (newVal != null) newVal.prepareMarshal(cctx.cacheObjectContext()); @@ -300,7 +298,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { */ void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { if (!isFiltered()) { - key.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (key != null) + key.finishUnmarshal(cctx.cacheObjectContext(), ldr); if (newVal != null) newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 59b2a68..f0cd7ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -39,6 +40,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; @@ -69,6 +71,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -91,6 +94,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler static final int LSNR_MAX_BUF_SIZE = IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000); + /** + * Transformer implementation for processing received remote events. + * They are already transformed so we simply return transformed value for event. + */ + private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> returnValTrans = + new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, Object>() { + @Override public Object apply(CacheEntryEvent<? extends K, ? extends V> evt) { + assert evt.getKey() == null; + + return evt.getValue(); + } + }; + /** Cache name. */ private String cacheName; @@ -161,7 +177,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient boolean ignoreClsNotFound; /** */ - private transient boolean asyncCb; + transient boolean asyncCb; /** */ private transient UUID nodeId; @@ -196,14 +212,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler public CacheContinuousQueryHandler( String cacheName, Object topic, - CacheEntryUpdatedListener<K, V> locLsnr, - CacheEntryEventSerializableFilter<K, V> rmtFilter, + @Nullable CacheEntryUpdatedListener<K, V> locLsnr, + @Nullable CacheEntryEventSerializableFilter<K, V> rmtFilter, boolean oldValRequired, boolean sync, boolean ignoreExpired, boolean ignoreClsNotFound) { assert topic != null; - assert locLsnr != null; this.cacheName = cacheName; this.topic = topic; @@ -505,14 +520,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (asyncCb) { ctx.asyncCallbackPool().execute(new Runnable() { @Override public void run() { - locLsnr.onUpdated(evts); + notifyLocalListener(evts, getTransformer()); } }, part); } else skipCtx.addProcessClosure(new Runnable() { @Override public void run() { - locLsnr.onUpdated(evts); + notifyLocalListener(evts, getTransformer()); } }); } @@ -584,6 +599,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * @return Cache entry event transformer. + */ + @Nullable protected IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer() { + return null; + } + + /** + * @return Local listener of transformed events. + */ + @Nullable protected EventListener<?> localTransformedEventListener() { + return null; + } + + /** * @param cctx Context. * @param nodeId ID of the node that started routine. * @param entry Entry. @@ -752,8 +781,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - if (!entries0.isEmpty()) - locLsnr.onUpdated(entries0); + notifyLocalListener(entries0, returnValTrans); } /** @@ -825,26 +853,31 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (cctx == null) return; - final CacheContinuousQueryEntry entry = evt.entry(); + CacheContinuousQueryEntry entry = evt.entry(); + + IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans = getTransformer(); if (loc) { if (!locCache) { Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); - if (!evts.isEmpty()) - locLsnr.onUpdated(evts); + notifyLocalListener(evts, trans); if (!internal && !skipPrimaryCheck) sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); } else { if (!entry.isFiltered()) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + notifyLocalListener(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt), trans); } } else { - if (!entry.isFiltered()) + if (!entry.isFiltered()) { + if (trans != null) + entry = transformToEntry(trans, evt); + prepareEntry(cctx, nodeId, entry); + } Object entryOrList = handleEntry(cctx, entry); @@ -889,6 +922,28 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * Notifies local listener. + * + * @param evts Events. + * @param trans Transformer + */ + private void notifyLocalListener(Collection<CacheEntryEvent<? extends K, ? extends V>> evts, + @Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans) { + EventListener locTransLsnr = localTransformedEventListener(); + + assert (locLsnr != null && locTransLsnr == null) || (locLsnr == null && locTransLsnr != null); + + if (F.isEmpty(evts)) + return; + + if (locLsnr != null) + locLsnr.onUpdated(evts); + + if (locTransLsnr != null) + locTransLsnr.onUpdated(transform(trans, evts)); + } + + /** * @return Task name. */ private String taskName() { @@ -1258,4 +1313,72 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } + /** + * @param trans Transformer. + * @param evts Source events. + * @return Collection of transformed values. + */ + private Iterable transform(final IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans, + Collection<CacheEntryEvent<? extends K, ? extends V>> evts) { + final Iterator<CacheEntryEvent<? extends K, ? extends V>> iter = evts.iterator(); + + return new Iterable() { + @NotNull @Override public Iterator iterator() { + return new Iterator() { + @Override public boolean hasNext() { + return iter.hasNext(); + } + + @Override public Object next() { + return transform(trans, iter.next()); + } + }; + } + }; + } + + /** + * Transform event data with {@link #getTransformer()} if exists. + * + * @param trans Transformer. + * @param evt Event to transform. + * @return Entry contains only transformed data if transformer exists. Unchanged event if transformer is not set. + * @see #getTransformer() + */ + private CacheContinuousQueryEntry transformToEntry(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans, + CacheContinuousQueryEvent<? extends K, ? extends V> evt) { + Object transVal = transform(trans, evt); + + return new CacheContinuousQueryEntry(evt.entry().cacheId(), + evt.entry().eventType(), + null, + transVal == null ? null : cacheContext(ctx).toCacheObject(transVal), + null, + evt.entry().isKeepBinary(), + evt.entry().partition(), + evt.entry().updateCounter(), + evt.entry().topologyVersion(), + evt.entry().flags()); + } + + /** + * @param trans Transformer. + * @param evt Event. + * @return Transformed value. + */ + private Object transform(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans, + CacheEntryEvent<? extends K, ? extends V> evt) { + assert trans != null; + + Object transVal = null; + + try { + transVal = trans.apply(evt); + } + catch (Exception e) { + U.error(log, e); + } + + return transVal; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index e48d22e..86c1ae1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -41,7 +41,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan private static final long serialVersionUID = 0L; /** Remote filter factory. */ - private Factory<? extends CacheEntryEventFilter> rmtFilterFactory; + Factory<? extends CacheEntryEventFilter> rmtFilterFactory; /** Deployable object for filter factory. */ private CacheContinuousQueryDeployableObject rmtFilterFactoryDep; @@ -74,8 +74,8 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan public CacheContinuousQueryHandlerV2( String cacheName, Object topic, - CacheEntryUpdatedListener<K, V> locLsnr, - Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory, + @Nullable CacheEntryUpdatedListener<K, V> locLsnr, + @Nullable Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory, boolean oldValRequired, boolean sync, boolean ignoreExpired, @@ -89,9 +89,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan sync, ignoreExpired, ignoreClsNotFound); - - assert rmtFilterFactory != null; - this.rmtFilterFactory = rmtFilterFactory; if (types != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java new file mode 100644 index 0000000..14d5605 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteClosure; +import org.jetbrains.annotations.Nullable; + +/** + * Continuous query handler V3 version. + * Contains {@link Factory} for remote transformer and {@link EventListener}. + * + * @see ContinuousQueryWithTransformer + */ +public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHandlerV2<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Remote transformer. */ + private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>> rmtTransFactory; + + /** Deployable object for transformer. */ + private CacheContinuousQueryDeployableObject rmtTransFactoryDep; + + /** Remote transformer. */ + private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> rmtTrans; + + /** Local listener for transformed events. */ + private transient EventListener<?> locTransLsnr; + + /** + * Empty constructor. + */ + public CacheContinuousQueryHandlerV3() { + super(); + } + + /** + * @param cacheName Cache name. + * @param topic Topic. + * @param locTransLsnr Local listener of transformed events + * @param rmtFilterFactory Remote filter factory. + * @param rmtTransFactory Remote transformer factory. + * @param oldValRequired OldValRequired flag. + * @param sync Sync flag. + * @param ignoreExpired IgnoreExpired flag. + * @param ignoreClsNotFound IgnoreClassNotFoundException flag. + */ + public CacheContinuousQueryHandlerV3( + String cacheName, + Object topic, + EventListener<?> locTransLsnr, + @Nullable Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory, + Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>> rmtTransFactory, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + boolean ignoreClsNotFound) { + super( + cacheName, + topic, + null, + rmtFilterFactory, + oldValRequired, + sync, + ignoreExpired, + ignoreClsNotFound, + null); + + assert locTransLsnr != null; + assert rmtTransFactory != null; + + this.locTransLsnr = locTransLsnr; + this.rmtTransFactory = rmtTransFactory; + } + + /** {@inheritDoc} */ + @Override protected IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer() { + if (rmtTrans == null && rmtTransFactory != null) + rmtTrans = rmtTransFactory.create(); + + return rmtTrans; + } + + /** {@inheritDoc} */ + @Override protected EventListener<?> localTransformedEventListener() { + return locTransLsnr; + } + + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter<K, V> getEventFilter() { + if (rmtFilterFactory == null) + return null; + + return super.getEventFilter(); + } + + /** {@inheritDoc} */ + @Override public RegisterStatus register(UUID nodeId, UUID routineId, + GridKernalContext ctx) throws IgniteCheckedException { + final IgniteClosure trans = getTransformer(); + + if (trans != null) + ctx.resource().injectGeneric(trans); + + if (locTransLsnr != null) { + ctx.resource().injectGeneric(locTransLsnr); + + asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class); + } + + return super.register(nodeId, routineId, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { + super.p2pMarshal(ctx); + + if (rmtTransFactory != null && !U.isGrid(rmtTransFactory.getClass())) + rmtTransFactoryDep = new CacheContinuousQueryDeployableObject(rmtTransFactory, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + super.p2pUnmarshal(nodeId, ctx); + + if (rmtTransFactoryDep != null) + rmtTransFactory = rmtTransFactoryDep.unmarshal(nodeId, ctx); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + boolean b = rmtTransFactoryDep != null; + + out.writeBoolean(b); + + if (b) + out.writeObject(rmtTransFactoryDep); + else + out.writeObject(rmtTransFactory); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + boolean b = in.readBoolean(); + + if (b) + rmtTransFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject(); + else + rmtTransFactory = (Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 628111b..1e131ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -49,6 +49,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -57,8 +58,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI2; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; @@ -434,6 +436,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param locLsnr Local listener. * @param rmtFilter Remote filter. + * @param rmtFilterFactory Remote filter factory * @param bufSize Buffer size. * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. @@ -441,9 +444,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeQuery(final CacheEntryUpdatedListener locLsnr, + public UUID executeQuery(@Nullable final CacheEntryUpdatedListener locLsnr, + @Nullable final EventListener locTransLsnr, @Nullable final CacheEntryEventSerializableFilter rmtFilter, @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory, + @Nullable final Factory<? extends IgniteClosure> rmtTransFactory, int bufSize, long timeInterval, boolean autoUnsubscribe, @@ -453,12 +458,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { { IgniteOutClosure<CacheContinuousQueryHandler> clsr; - if (rmtFilterFactory != null) + if (rmtTransFactory != null) { + clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply() { + assert locTransLsnr != null; + + return new CacheContinuousQueryHandlerV3( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locTransLsnr, + rmtFilterFactory, + rmtTransFactory, + true, + false, + !includeExpired, + false); + } + }; + } + else if (rmtFilterFactory != null) { clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { @Override public CacheContinuousQueryHandler apply() { - CacheContinuousQueryHandler hnd; + assert locLsnr != null; - hnd = new CacheContinuousQueryHandlerV2( + return new CacheContinuousQueryHandlerV2( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), locLsnr, @@ -468,13 +491,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { !includeExpired, false, null); - - return hnd; } }; - else + } + else { clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { @Override public CacheContinuousQueryHandler apply() { + assert locLsnr != null; + assert locTransLsnr == null; + return new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -486,6 +511,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { false); } }; + } return executeQuery0( locLsnr, @@ -676,6 +702,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } if (notifyExisting) { + assert locLsnr != null : "Local listener can't be null if notification for existing entries are enabled"; + final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(), true, true, http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index 142ff35..9a815de 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import javax.cache.Cache; import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryListenerException; @@ -51,8 +52,11 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.AbstractContinuousQuery; import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; @@ -62,6 +66,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -464,7 +469,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg); try { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); + AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = createQuery(); final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = new CopyOnWriteArrayList<>(); @@ -472,13 +477,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (noOpFilterFactory() != null) qry.setRemoteFilterFactory(noOpFilterFactory()); - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> events) throws CacheEntryListenerException { - for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) - evts.add(e); - } - }); + if (qry instanceof ContinuousQuery) { + ((ContinuousQuery<QueryTestKey, QueryTestValue>)qry).setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) + evts.add(e); + } + }); + } + else if (qry instanceof ContinuousQueryWithTransformer) + initQueryWithTransformer( + (ContinuousQueryWithTransformer<QueryTestKey, QueryTestValue, CacheEntryEvent>)qry, evts); + else + fail("Unknown query type"); QueryTestKey key = new QueryTestKey(1); @@ -595,7 +607,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg); try { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); + AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = createQuery(); final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = new CopyOnWriteArrayList<>(); @@ -603,13 +615,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (noOpFilterFactory() != null) qry.setRemoteFilterFactory(noOpFilterFactory()); - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> events) throws CacheEntryListenerException { - for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) - evts.add(e); - } - }); + if (qry instanceof ContinuousQuery) { + ((ContinuousQuery<QueryTestKey, QueryTestValue>)qry).setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) + evts.add(e); + } + }); + } + else if (qry instanceof ContinuousQueryWithTransformer) + initQueryWithTransformer( + (ContinuousQueryWithTransformer<QueryTestKey, QueryTestValue, CacheEntryEvent>)qry, evts); + else + fail("Unknown query type"); Map<QueryTestKey, QueryTestValue> map = new TreeMap<>(); @@ -834,16 +853,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract Collection<QueryCursor<?>> curs = new ArrayList<>(); if (deploy == CLIENT) { - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + AbstractContinuousQuery<Object, Object> qry = createQuery(); final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); + if (qry instanceof ContinuousQuery) { + ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + } + else if (qry instanceof ContinuousQueryWithTransformer) + initQueryWithTransformer( + (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue); + else + fail("Unknown query type"); evtsQueues.add(evtsQueue); @@ -852,16 +878,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract curs.add(cur); } else if (deploy == SERVER) { - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + AbstractContinuousQuery<Object, Object> qry = createQuery(); final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); + if (qry instanceof ContinuousQuery) { + ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + } + else if (qry instanceof ContinuousQueryWithTransformer) + initQueryWithTransformer( + (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue); + else + fail("Unknown query type"); evtsQueues.add(evtsQueue); @@ -871,16 +904,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } else { for (int i = 0; i <= getServerNodeCount(); i++) { - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + AbstractContinuousQuery<Object, Object> qry = createQuery(); final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); + if (qry instanceof ContinuousQuery) { + ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + } + else if (qry instanceof ContinuousQueryWithTransformer) + initQueryWithTransformer( + (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue); + else + fail("Unknown query type"); evtsQueues.add(evtsQueue); @@ -1417,6 +1457,15 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** + * @param <K> Key type. + * @param <V> Value type. + * @return New instance of continuous query. + */ + protected <K, V> AbstractContinuousQuery<K, V> createQuery() { + return new ContinuousQuery<>(); + } + + /** * */ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { @@ -1586,4 +1635,37 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract protected enum ContinuousDeploy { CLIENT, SERVER, ALL } + + /** + * Initialize continuous query with transformer. + * Query will accumulate all events in accumulator. + * + * @param qry Continuous query. + * @param acc Accumulator for events. + * @param <K> Key type. + * @param <V> Value type. + */ + private <K, V> void initQueryWithTransformer( + ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry, + Collection<CacheEntryEvent<? extends K, ? extends V>> acc) { + + IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, CacheEntryEvent> transformer = + new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, CacheEntryEvent>() { + @Override public CacheEntryEvent apply(CacheEntryEvent<? extends K, ? extends V> event) { + return event; + } + }; + + ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry0 = + (ContinuousQueryWithTransformer<K, V, CacheEntryEvent>)qry; + + qry0.setRemoteTransformerFactory(FactoryBuilder.factoryOf(transformer)); + + qry0.setLocalListener(new EventListener<CacheEntryEvent>() { + @Override public void onUpdated(Iterable<? extends CacheEntryEvent> events) { + for (CacheEntryEvent e : events) + acc.add(e); + } + }); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java new file mode 100644 index 0000000..8d8b4c1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.Ignite; + +/** + */ +public class CacheContinuousWithTransformerClientSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + client = true; + + startGrid("client"); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected Ignite gridToRunQuery() throws Exception { + return grid("client"); + } +}