This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6363aef  IGNITE-11614 Fix transaction hang when sessionEnd throws an 
exception - Fixes #6401.
6363aef is described below

commit 6363aef944cb77280c3513c77b940698ad30c5c1
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
AuthorDate: Fri Apr 5 18:32:51 2019 +0300

    IGNITE-11614 Fix transaction hang when sessionEnd throws an exception - 
Fixes #6401.
    
    Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
---
 .../cache/distributed/near/GridNearTxLocal.java    |   3 +-
 .../cache/CacheStoreTxPutAllMultiNodeTest.java     | 224 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite8.java   |   2 +
 3 files changed, 228 insertions(+), 1 deletion(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b55b60c..b3b43b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -4423,7 +4423,8 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
             if (trackTimeout)
                 rmv = removeTimeoutHandler();
 
-            if (state != COMMITTING && state != ROLLING_BACK && (!trackTimeout 
|| rmv))
+            if (state != COMMITTING && state != ROLLING_BACK &&
+                (!trackTimeout || rmv || (prepFut != null && 
prepFut.isDone())))
                 rollbackNearTxLocalAsync(clearThreadMap, false).get();
 
             synchronized (this) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreTxPutAllMultiNodeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreTxPutAllMultiNodeTest.java
new file mode 100644
index 0000000..d2e7121
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreTxPutAllMultiNodeTest.java
@@ -0,0 +1,224 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CacheStoreTxPutAllMultiNodeTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private Ignite client;
+
+    /** */
+    private IgniteCache<Integer, String> cache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String 
gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.contains("client"))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
+
+        startGrid(1);
+        startGrid(2);
+
+        client = startGrid("client");
+
+        cache = client.getOrCreateCache(cacheConfiguration());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testPutAllInTransaction() throws Exception {
+        try (Transaction tx = 
client.transactions().txStart(TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.REPEATABLE_READ)) {
+            tx.timeout(10_000);
+
+            cache.putAll(createMap());
+
+            tx.commit();
+        }
+        catch (Exception ex) {
+            info("Expected exception: " + ex.getMessage() + ", suppressed=" + 
Arrays.toString(ex.getSuppressed()));
+        }
+
+        startGrid(3);
+
+        IgniteTxManager txMgr = ((IgniteEx) 
client).context().cache().context().tm();
+
+        long curTime = U.currentTimeMillis();
+
+        for (IgniteInternalTx tx : txMgr.activeTransactions())
+            assertTrue(curTime - tx.startTime() < tx.timeout());
+
+        assertTrue(client.transactions().localActiveTransactions().isEmpty());
+
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Integer, String> cacheConfiguration() {
+        CacheConfiguration<Integer, String> cfg = new 
CacheConfiguration<>(CACHE_NAME);
+
+        cfg.setCacheStoreFactory(new 
CacheStoreTxPutAllMultiNodeTest.StoreFactory());
+
+        cfg.setReadThrough(true);
+        cfg.setWriteThrough(true);
+        
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        return cfg;
+    }
+
+    /**
+     * @return Map to put.
+     */
+    private static Map<Integer, String> createMap() {
+        Map<Integer, String> data = new TreeMap<>();
+
+        for (int i = 1; i < 500; i ++)
+            data.put(i, "Eddy " + i);
+
+        return data;
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<CacheStore<? super 
Integer, ? super String>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public CacheStore<? super Integer, ? super String> create() {
+            return new CacheStoreTxPutAllMultiNodeTest.TestStore();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStoreAdapter<Integer, String> 
implements Serializable {
+        /** */
+        private final ConcurrentHashMap<Integer, String> map = new 
ConcurrentHashMap<>();
+
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Auto-injected store session. */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /**
+         *
+         */
+        public TestStore() {
+            for (int i = -100; i < 1000; i++)
+                map.put(i, String.valueOf(i));
+
+            map.put(1000, "key");
+        }
+
+        /** {@inheritDoc} */
+        @Override public String load(Integer key) throws CacheLoaderException {
+            return map.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends 
String> entry) throws CacheWriterException {
+            map.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SuspiciousMethodCalls")
+        @Override public void delete(Object key) throws CacheWriterException {
+            map.remove(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sessionEnd(boolean commit) {
+            Transaction tx = transaction();
+
+            throw new CacheWriterException("SessionEnd cache store is closed." 
+ tx);
+        }
+
+        /**
+         * @return Current transaction.
+         */
+        private @Nullable Transaction transaction() {
+            CacheStoreSession ses = session();
+
+            return ses != null ? ses.transaction() : null;
+        }
+
+        /**
+         * @return Store session.
+         */
+        protected CacheStoreSession session() {
+            return ses;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
index a4f6f2f..33625fb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import 
org.apache.ignite.internal.processors.cache.CacheStoreTxPutAllMultiNodeTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheOrderedPreloadingSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
@@ -70,6 +71,7 @@ public class IgniteCacheTestSuite8 {
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheRebalancingAsyncSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheRebalancingCancelTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CacheStoreTxPutAllMultiNodeTest.class, ignoredTests);
 
         return suite;
     }

Reply via email to