IGNITE-9937 Primary response error can be lost due to unwrapping a key - Fixes #5078.
Signed-off-by: Pavel Kovalenko <jokse...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e1d1783 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e1d1783 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e1d1783 Branch: refs/heads/ignite-9720 Commit: 7e1d17830429a78ca62e2f007fece7de6466eb0f Parents: e8eeea3 Author: Roman Guseinov <gromc...@gmail.com> Authored: Mon Nov 26 16:57:47 2018 +0300 Committer: Pavel Kovalenko <jokse...@gmail.com> Committed: Mon Nov 26 16:57:47 2018 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 30 ++++- .../GridNearAtomicAbstractUpdateFuture.java | 43 ++++++- .../cache/store/CacheStoreWriteErrorTest.java | 127 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 4 files changed, 196 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 86d7b3c..74be8e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -34,6 +34,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryInvalidTypeException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -2725,6 +2726,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture(); + Collection<Object> failedToUnwrapKeys = null; + // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -2737,9 +2740,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { continue; } - if (storeErr != null && - storeErr.failedKeys().contains(entry.key().value(ctx.cacheObjectContext(), false))) - continue; + if (storeErr != null) { + Object key = entry.key(); + + try { + key = entry.key().value(ctx.cacheObjectContext(), false); + } + catch (BinaryInvalidTypeException e) { + if (log.isDebugEnabled()) { + if (failedToUnwrapKeys == null) + failedToUnwrapKeys = new ArrayList<>(); + + // To limit keys count in log message. + if (failedToUnwrapKeys.size() < 5) + failedToUnwrapKeys.add(key); + } + } + + if (storeErr.failedKeys().contains(key)) + continue; + } try { // We are holding java-level locks on entries at this point. @@ -2868,6 +2888,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1); } + if (failedToUnwrapKeys != null) { + log.warning("Failed to get values of keys: " + failedToUnwrapKeys + + " (the binary objects will be used instead)."); + } } catch (IgniteCheckedException e) { res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index f91e3f3..983b094 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryInvalidTypeException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -399,10 +400,46 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture Collection<Object> keys = new ArrayList<>(keys0.size()); - for (KeyCacheObject key : keys0) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + Collection<Object> failedToUnwrapKeys = null; - err.add(keys, res.error(), req.topologyVersion()); + Exception suppressedErr = null; + + for (KeyCacheObject key : keys0) { + try { + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + } + catch (BinaryInvalidTypeException e) { + keys.add(cctx.toCacheKeyObject(key)); + + if (log.isDebugEnabled()) { + if (failedToUnwrapKeys == null) + failedToUnwrapKeys = new ArrayList<>(); + + // To limit keys count in log message. + if (failedToUnwrapKeys.size() < 5) + failedToUnwrapKeys.add(key); + } + + suppressedErr = e; + } + catch (Exception e) { + keys.add(cctx.toCacheKeyObject(key)); + + suppressedErr = e; + } + } + + if (failedToUnwrapKeys != null) { + log.warning("Failed to unwrap keys: " + failedToUnwrapKeys + + " (the binary objects will be used instead)."); + } + + IgniteCheckedException error = res.error(); + + if (suppressedErr != null) + error.addSuppressed(suppressedErr); + + err.add(keys, error, req.topologyVersion()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java new file mode 100644 index 0000000..fce1f5d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWriteErrorTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import com.google.common.base.Throwables; +import java.util.HashMap; +import java.util.concurrent.Callable; +import javax.cache.Cache; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * This class tests handling exceptions from {@link CacheStore#write(Cache.Entry)}. + */ +public class CacheStoreWriteErrorTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE_NAME = "cache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC) + .setCacheStoreFactory(FactoryBuilder.factoryOf(ThrowableCacheStore.class)) + .setWriteThrough(true) + .setStoreKeepBinary(true); + + return super.getConfiguration(gridName) + .setCacheConfiguration(cacheCfg); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Checks primary error while saving batch with one entry. + */ + public void testPrimaryErrorForBatchSize1() { + checkPrimaryError(1); + } + + /** + * Checks primary error while saving batch with two entries. + */ + public void testPrimaryErrorForBatchSize2() { + checkPrimaryError(2); + } + + /** + * Checks that primary error ({@link CacheWriterException}) is not lost due to unwrapping a key. + * + * @param batchSize Batch size. + */ + private void checkPrimaryError(int batchSize) { + Throwable t = GridTestUtils.assertThrows(log, + new Callable<Object>() { + @Override public Object call() throws Exception { + try (Ignite grid = startGrid()) { + IgniteCache<BinaryObject, String> cache = grid.cache(CACHE_NAME); + + HashMap<BinaryObject, String> batch = new HashMap<>(); + + for (int i = 0; i < batchSize; i++) { + BinaryObject key = grid.binary().builder("KEY_TYPE_NAME").setField("id", i).build(); + + batch.put(key, "VALUE"); + } + + cache.putAllAsync(batch).get(); + } + + return null; + } + }, CacheWriterException.class, null); + + assertTrue("Stacktrace should contain the message of the original exception", + Throwables.getStackTraceAsString(t).contains(ThrowableCacheStore.EXCEPTION_MESSAGE)); + } + + /** + * {@link CacheStore} implementation which throws {@link RuntimeException} for every write operation. + */ + public static class ThrowableCacheStore extends CacheStoreAdapter<Object, Object> { + /** */ + private static final String EXCEPTION_MESSAGE = "WRITE CACHE STORE EXCEPTION"; + + /** {@inheritDoc} */ + @Override public Object load(Object o) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { + throw new RuntimeException(EXCEPTION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public void delete(Object o) throws CacheWriterException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e1d1783/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 5d7b306..52e2ba2 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -22,6 +22,7 @@ import junit.framework.TestSuite; import org.apache.ignite.cache.IgniteCacheEntryProcessorSequentialCallTest; import org.apache.ignite.cache.IgniteWarmupClosureSelfTest; import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest; +import org.apache.ignite.cache.store.CacheStoreWriteErrorTest; import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest; import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest; import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest; @@ -333,6 +334,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridStoreLoadCacheTest.class); suite.addTestSuite(CacheStoreReadFromBackupTest.class); + suite.addTestSuite(CacheStoreWriteErrorTest.class); suite.addTestSuite(CacheTransactionalStoreReadFromBackupTest.class); //suite.addTestSuite(CacheAtomicSingleMessageCountSelfTest.class);