IGNITE-8995 Proper handlig of exceptions from scan query filter and transformer
Signed-off-by: Andrey Gura <ag...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0d82f100 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0d82f100 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0d82f100 Branch: refs/heads/ignite-8783 Commit: 0d82f1004c72dd3881e453e555e9803ecd889c74 Parents: ca321ab Author: Ivan Daschinskiy <ivanda...@gmail.com> Authored: Mon Jul 16 13:14:00 2018 +0300 Committer: Andrey Gura <ag...@apache.org> Committed: Mon Jul 16 13:14:00 2018 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryManager.java | 88 +++++++--- .../cache/query/CacheScanQueryFailoverTest.java | 174 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 3 files changed, 242 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0d82f100/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index f310511..8f0edb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -804,6 +804,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte boolean locNode) throws IgniteCheckedException { final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter(); + final InternalScanFilter<K,V> intFilter = keyValFilter != null ? new InternalScanFilter<>(keyValFilter) : null; try { injectResources(keyValFilter); @@ -813,7 +814,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (part != null && (part < 0 || part >= cctx.affinity().partitions())) return new GridEmptyCloseableIterator() { @Override public void close() throws IgniteCheckedException { - closeScanFilter(keyValFilter); + if (intFilter != null) + intFilter.close(); + super.close(); } }; @@ -852,23 +855,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, transformer, locNode, cctx, log); } catch (IgniteCheckedException | RuntimeException e) { - closeScanFilter(keyValFilter); + if (intFilter != null) + intFilter.close(); throw e; } } /** - * Closes a filter if it is closeable. - * - * @param f Filter. - */ - private static void closeScanFilter(Object f) { - if (f instanceof PlatformCacheEntryFilter) - ((PlatformCacheEntryFilter)f).onClose(); - } - - /** * @param o Object to inject resources to. * @throws IgniteCheckedException If failure occurred while injecting resources. */ @@ -1378,7 +1372,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final String namex = cctx.name(); - final IgniteBiPredicate<K, V> scanFilter = qry.scanFilter(); + final InternalScanFilter<K, V> intFilter = qry.scanFilter() != null ? + new InternalScanFilter<>(qry.scanFilter()) : null; try { assert qry.type() == SCAN; @@ -1399,7 +1394,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte namex, null, null, - scanFilter, + intFilter != null ? intFilter.scanFilter() : null, null, null, subjId, @@ -1417,7 +1412,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return it; } catch (Exception e) { - closeScanFilter(scanFilter); + if (intFilter != null) + intFilter.close(); if (updateStatistics) cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex, startTime, @@ -2850,7 +2846,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private final GridDhtLocalPartition locPart; /** */ - private final IgniteBiPredicate<K, V> scanFilter; + private final InternalScanFilter<K, V> intScanFilter; /** */ private final boolean statsEnabled; @@ -2930,7 +2926,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte this.it = it; this.topVer = topVer; this.locPart = locPart; - this.scanFilter = scanFilter; + this.intScanFilter = scanFilter != null ? new InternalScanFilter<>(scanFilter) : null; this.cctx = cctx; this.log = log; this.locNode = locNode; @@ -2998,7 +2994,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (locPart != null) locPart.release(); - closeScanFilter(scanFilter); + if (intScanFilter != null) + intScanFilter.close(); } /** @@ -3086,7 +3083,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte metrics.addGetTimeNanos(System.nanoTime() - start); } - if (scanFilter == null || scanFilter.apply(key0, val0)) { + if (intScanFilter == null || intScanFilter.apply(key0, val0)) { if (readEvt) { cctx.gridEvents().record(new CacheQueryReadEvent<>( cctx.localNode(), @@ -3096,7 +3093,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte cacheName, null, null, - scanFilter, + intScanFilter != null ? intScanFilter.scanFilter() : null, null, null, subjId, @@ -3107,8 +3104,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null)); } - if (transform != null) - next0 = transform.apply(new CacheQueryEntry<>(key0, val0)); + if (transform != null) { + try { + next0 = transform.apply(new CacheQueryEntry<>(key0, val0)); + } + catch (Throwable e) { + throw new IgniteException(e); + } + } else next0 = !locNode ? new GridCacheQueryResponseEntry<>(key0, val0): new CacheQueryEntry<>(key0, val0); @@ -3125,4 +3128,45 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } } + + /** + * Wrap scan filter in order to catch unhandled errors. + */ + private static class InternalScanFilter<K, V> implements IgniteBiPredicate<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteBiPredicate<K, V> scanFilter; + + /** + * @param scanFilter User scan filter. + */ + InternalScanFilter(IgniteBiPredicate<K, V> scanFilter) { + this.scanFilter = scanFilter; + } + + /** {@inheritDoc} */ + @Override public boolean apply(K k, V v){ + try { + return scanFilter == null || scanFilter.apply(k, v); + } + catch (Throwable e) { + throw new IgniteException(e); + } + } + + /** */ + void close() { + if (scanFilter instanceof PlatformCacheEntryFilter) + ((PlatformCacheEntryFilter)scanFilter).onClose(); + } + + /** + * @return Wrapped scan filter. + */ + IgniteBiPredicate<K, V> scanFilter() { + return scanFilter; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0d82f100/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java new file mode 100644 index 0000000..0633138 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteBinary; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * ScanQuery failover test. Tests scenario where user supplied closures throw unhandled errors. + */ +public class CacheScanQueryFailoverTest extends GridCommonAbstractTest { + /** */ + private static final String LOCAL_CACHE_NAME = "local"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected boolean isMultiJvm() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean isRemoteJvm(String igniteInstanceName) { + if(igniteInstanceName.equals("client") || igniteInstanceName.equals("server")) + return false; + else + return super.isRemoteJvm(igniteInstanceName); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + if (name.equals("client")) + cfg.setClientMode(true); + + cfg.setFailureHandler(new StopNodeOrHaltFailureHandler()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryWithFailedClosures() throws Exception { + Ignite srv = startGrids(4); + Ignite client = startGrid("client"); + + CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME).setCacheMode(PARTITIONED); + + // Test query from client node. + queryCachesWithFailedPredicates(client, cfg); + + // Test query from server node. + queryCachesWithFailedPredicates(srv, cfg); + + assertEquals(client.cluster().nodes().size(), 5); + }; + + /** + * @throws Exception If failed. + */ + public void testScanQueryOverLocalCacheWithFailedClosures() throws Exception { + Ignite srv = startGrids(4); + + queryCachesWithFailedPredicates(srv, new CacheConfiguration(LOCAL_CACHE_NAME).setCacheMode(LOCAL)); + + assertEquals(srv.cluster().nodes().size(), 4); + }; + + /** + * @param ignite Ignite instance. + * @param configs Cache configurations. + */ + private void queryCachesWithFailedPredicates(Ignite ignite, CacheConfiguration... configs) { + if (configs == null) + return; + + for (CacheConfiguration cfg: configs) { + IgniteCache cache = ignite.getOrCreateCache(cfg); + + populateCache(ignite, cache.getName()); + + // Check that exception propagates to client from filter failure. + GridTestUtils.assertThrowsAnyCause(log, () -> { + try (QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor = + cache.withKeepBinary().query(new ScanQuery<>(filter))) { + for (Cache.Entry<Integer, BinaryObject> entry : cursor) + log.info("Entry " + entry.toString()); + } + + return null; + }, Error.class, "Poison pill"); + + // Check that exception propagates to client from transformer failure. + GridTestUtils.assertThrowsAnyCause(log, () -> { + try (QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor = + cache.withKeepBinary().query(new ScanQuery<>(), transformer)) { + for (Cache.Entry<Integer, BinaryObject> entry : cursor) + log.info("Entry " + entry.toString()); + } + + return null; + }, Error.class, "Poison pill"); + } + } + + /** + * @param ignite Ignite instance. + * @param cacheName Cache name. + */ + private void populateCache(Ignite ignite, String cacheName) { + IgniteBinary binary = ignite.binary(); + + try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(cacheName)) { + for (int i = 0; i < 1_000; i++) + streamer.addData(i, binary.builder("type_name").setField("f_" + i, "v_" + i).build()); + } + } + + /** Failed filter. */ + private static IgniteBiPredicate<Integer, BinaryObject> filter = (key, value) -> { + throw new Error("Poison pill"); + }; + + /** Failed entry transformer. */ + private static IgniteClosure<Cache.Entry<Integer, BinaryObject>, Cache.Entry<Integer, BinaryObject>> transformer = + integerBinaryObjectEntry -> { + throw new Error("Poison pill"); + }; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0d82f100/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index eb5ea4a..b4b7e91 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQueryCancelOrTimeoutSelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.CacheScanQueryFailoverTest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest; import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDestroySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest; @@ -278,6 +279,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class); suite.addTestSuite(IgniteCacheQueryNoRebalanceSelfTest.class); suite.addTestSuite(GridCacheQueryTransformerSelfTest.class); + suite.addTestSuite(CacheScanQueryFailoverTest.class); suite.addTestSuite(IgniteCachePrimitiveFieldsQuerySelfTest.class); suite.addTestSuite(IgniteCacheJoinQueryWithAffinityKeyTest.class);