IGNITE-9373 Fixed MVCC failing tests - Fixes #4623. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/254a6529 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/254a6529 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/254a6529 Branch: refs/heads/master Commit: 254a6529228240dcc229a875b2f2847d456b5610 Parents: 4d736fc Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Authored: Fri Aug 31 19:24:00 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Aug 31 19:24:00 2018 +0300 ---------------------------------------------------------------------- .../suite/IgniteJdbcDriverMvccTestSuite.java | 47 ++ .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 1 - ...ThinTransactionsWithMvccEnabledSelfTest.java | 447 +++++++++++++++++++ .../cache/IgniteCacheOffheapManagerImpl.java | 11 +- .../cache/mvcc/MvccProcessorImpl.java | 35 +- .../processors/cache/mvcc/MvccUtils.java | 11 +- .../cache/query/GridCacheQueryAdapter.java | 16 +- ...vccAbstractBasicCoordinatorFailoverTest.java | 4 - .../mvcc/CacheMvccAbstractFeatureTest.java | 13 +- .../cache/mvcc/CacheMvccAbstractTest.java | 21 +- .../cache/mvcc/CacheMvccClusterRestartTest.java | 18 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 37 +- .../DataStreamProcessorMvccSelfTest.java | 70 +++ .../junits/common/GridCommonAbstractTest.java | 11 +- .../testsuites/IgniteCacheMvccTestSuite.java | 14 +- .../processors/query/h2/H2FieldsIterator.java | 2 +- .../query/h2/H2ResultSetIterator.java | 25 +- .../index/SqlTransactionsComandsSelfTest.java | 83 ++++ ...sactionsCommandsWithMvccEnabledSelfTest.java | 420 +++++++++++++++++ .../mvcc/CacheMvccBackupsAbstractTest.java | 6 +- .../mvcc/CacheMvccSqlQueriesAbstractTest.java | 2 + ...MvccSqlTxQueriesWithReducerAbstractTest.java | 2 +- .../query/h2/GridIndexRebuildSelfTest.java | 188 ++------ ...GridIndexRebuildWithMvccEnabledSelfTest.java | 126 ++++++ .../testsuites/IgniteCacheMvccSqlTestSuite.java | 22 +- .../IgniteCacheQuerySelfTestSuite.java | 14 +- .../ApiParity/IgniteConfigurationParityTest.cs | 5 +- 27 files changed, 1400 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java new file mode 100644 index 0000000..6d8933d --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverMvccTestSuite.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.suite; + +import junit.framework.TestSuite; +import org.apache.ignite.jdbc.thin.JdbcThinConnectionMvccEnabledSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinTransactionsClientAutoCommitComplexSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinTransactionsClientNoAutoCommitComplexSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinTransactionsWithMvccEnabledSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinTransactionsServerAutoCommitComplexSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinTransactionsServerNoAutoCommitComplexSelfTest; + +public class IgniteJdbcDriverMvccTestSuite extends TestSuite { + /** + * @return JDBC Driver Test Suite. + * @throws Exception In case of error. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite JDBC Driver Test Suite"); + + suite.addTest(new TestSuite(JdbcThinConnectionMvccEnabledSelfTest.class)); + + // Transactions + suite.addTest(new TestSuite(JdbcThinTransactionsWithMvccEnabledSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinTransactionsClientAutoCommitComplexSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinTransactionsServerAutoCommitComplexSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinTransactionsClientNoAutoCommitComplexSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinTransactionsServerNoAutoCommitComplexSelfTest.class)); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 275040f..7fbf41f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -19,7 +19,6 @@ package org.apache.ignite.jdbc.suite; import junit.framework.TestSuite; import org.apache.ignite.internal.jdbc2.JdbcBlobTest; -import org.apache.ignite.internal.jdbc2.JdbcBulkLoadSelfTest; import org.apache.ignite.internal.jdbc2.JdbcConnectionReopenTest; import org.apache.ignite.internal.jdbc2.JdbcDistributedJoinsQueryTest; import org.apache.ignite.jdbc.JdbcComplexQuerySelfTest; http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java new file mode 100644 index 0000000..e3f7f14 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsWithMvccEnabledSelfTest.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.BatchUpdateException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.query.NestedTxMode; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridStringLogger; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.NotNull; + +/** + * Tests to check behavior with transactions on. + */ +public class JdbcThinTransactionsWithMvccEnabledSelfTest extends JdbcThinAbstractSelfTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String URL = "jdbc:ignite:thin://127.0.0.1"; + + /** Logger. */ + private GridStringLogger log; + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setMvccEnabled(true); + + cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setMarshaller(new BinaryMarshaller()); + + cfg.setGridLogger(log = new GridStringLogger()); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@NotNull String name) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setName(name); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGrid(0); + + try (Connection c = c(true, NestedTxMode.ERROR)) { + try (Statement s = c.createStatement()) { + s.execute("CREATE TABLE INTS (k int primary key, v int) WITH \"cache_name=ints,wrap_value=false," + + "atomicity=transactional\""); + } + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @param autoCommit Auto commit mode. + * @param nestedTxMode Nested transactions mode. + * @return Connection. + * @throws SQLException if failed. + */ + private static Connection c(boolean autoCommit, NestedTxMode nestedTxMode) throws SQLException { + Connection res = DriverManager.getConnection(URL + "/?nestedTransactionsMode=" + nestedTxMode.name()); + + res.setAutoCommit(autoCommit); + + return res; + } + + /** + * + */ + public void testTransactionsBeginCommitRollback() throws IgniteCheckedException { + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + try (Connection c = c(false, NestedTxMode.ERROR)) { + while (!stop.get()) { + try (Statement s = c.createStatement()) { + s.execute("BEGIN"); + + c.commit(); + + s.execute("BEGIN"); + + c.rollback(); + } + } + } + } + catch (SQLException e) { + throw new AssertionError(e); + } + } + }, 8, "jdbc-transactions"); + + U.sleep(5000); + + stop.set(true); + + fut.get(); + } + + /** + * + */ + public void testTransactionsBeginCommitRollbackAutocommit() throws IgniteCheckedException { + GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + try (Connection c = c(true, NestedTxMode.ERROR)) { + try (Statement s = c.createStatement()) { + s.execute("BEGIN"); + + s.execute("COMMIT"); + + s.execute("BEGIN"); + + s.execute("ROLLBACK"); + } + } + } + catch (SQLException e) { + throw new AssertionError(e); + } + } + }, 8, "jdbc-transactions").get(); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testIgnoreNestedTxAutocommitOff() throws SQLException { + try (Connection c = c(false, NestedTxMode.IGNORE)) { + doNestedTxStart(c, false); + } + + assertTrue(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCommitNestedTxAutocommitOff() throws SQLException { + try (Connection c = c(false, NestedTxMode.COMMIT)) { + doNestedTxStart(c, false); + } + + assertFalse(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testErrorNestedTxAutocommitOff() throws SQLException { + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + try (Connection c = c(false, NestedTxMode.ERROR)) { + doNestedTxStart(c, false); + } + + throw new AssertionError(); + } + }, SQLException.class, "Transaction has already been started."); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testIgnoreNestedTxAutocommitOn() throws SQLException { + try (Connection c = c(true, NestedTxMode.IGNORE)) { + doNestedTxStart(c, false); + } + + assertTrue(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCommitNestedTxAutocommitOn() throws SQLException { + try (Connection c = c(true, NestedTxMode.COMMIT)) { + doNestedTxStart(c, false); + } + + assertFalse(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testErrorNestedTxAutocommitOn() throws SQLException { + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + try (Connection c = c(true, NestedTxMode.ERROR)) { + doNestedTxStart(c, false); + } + + throw new AssertionError(); + } + }, SQLException.class, "Transaction has already been started."); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testIgnoreNestedTxAutocommitOffBatched() throws SQLException { + try (Connection c = c(false, NestedTxMode.IGNORE)) { + doNestedTxStart(c, true); + } + + assertTrue(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCommitNestedTxAutocommitOffBatched() throws SQLException { + try (Connection c = c(false, NestedTxMode.COMMIT)) { + doNestedTxStart(c, true); + } + + assertFalse(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testErrorNestedTxAutocommitOffBatched() throws SQLException { + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + try (Connection c = c(false, NestedTxMode.ERROR)) { + doNestedTxStart(c, true); + } + + throw new AssertionError(); + } + }, BatchUpdateException.class, "Transaction has already been started."); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testIgnoreNestedTxAutocommitOnBatched() throws SQLException { + try (Connection c = c(true, NestedTxMode.IGNORE)) { + doNestedTxStart(c, true); + } + + assertTrue(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCommitNestedTxAutocommitOnBatched() throws SQLException { + try (Connection c = c(true, NestedTxMode.COMMIT)) { + doNestedTxStart(c, true); + } + + assertFalse(log.toString().contains("ignoring BEGIN command")); + } + + /** + * + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testErrorNestedTxAutocommitOnBatched() throws SQLException { + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + try (Connection c = c(true, NestedTxMode.ERROR)) { + doNestedTxStart(c, true); + } + + throw new AssertionError(); + } + }, BatchUpdateException.class, "Transaction has already been started."); + } + + /** + * Try to start nested transaction via batch as well as separate statements. + * @param conn Connection. + * @param batched Whether {@link Statement#executeBatch()} should be used. + * @throws SQLException if failed. + */ + private void doNestedTxStart(Connection conn, boolean batched) throws SQLException { + try (Statement s = conn.createStatement()) { + s.executeQuery("SELECT * FROM INTS"); + + if (batched) { + s.addBatch("BEGIN"); + + s.addBatch("BEGIN"); + + s.executeBatch(); + } + else { + s.execute("BEGIN"); + + s.execute("BEGIN"); + } + } + } + + /** + * @throws SQLException if failed. + */ + public void testAutoCommitSingle() throws SQLException { + doTestAutoCommit(false); + } + + /** + * @throws SQLException if failed. + */ + public void testAutoCommitBatched() throws SQLException { + doTestAutoCommit(true); + } + + /** + * @param batched Batch mode flag. + * @throws SQLException if failed. + */ + private void doTestAutoCommit(boolean batched) throws SQLException { + IgniteCache<Integer, ?> cache = grid(0).cache("ints"); + + try (Connection c = c(false, NestedTxMode.ERROR)) { + try (Statement s = c.createStatement()) { + assertFalse(s.executeQuery("SELECT * from INTS").next()); + + if (batched) { + s.addBatch("INSERT INTO INTS(k, v) values(1, 1)"); + + s.executeBatch(); + } + else + s.execute("INSERT INTO INTS(k, v) values(1, 1)"); + + // We haven't committed anything yet - this check shows that autoCommit flag is in effect. + assertTrue(cache.query(new SqlFieldsQuery("SELECT * from INTS")).getAll().isEmpty()); + + // We should see own updates. + assertTrue(s.executeQuery("SELECT * from INTS").next()); + + c.commit(); + + c.setAutoCommit(true); + + assertEquals(1, cache.get(1)); + + assertTrue(s.executeQuery("SELECT * from INTS").next()); + } + } + } + + /** + * Test that exception in one of the statements does not kill connection worker altogether. + * @throws SQLException if failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testExceptionHandling() throws SQLException { + try (Connection c = c(true, NestedTxMode.ERROR)) { + try (Statement s = c.createStatement()) { + s.execute("INSERT INTO INTS(k, v) values(1, 1)"); + + assertEquals(1, grid(0).cache("ints").get(1)); + + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + s.execute("INSERT INTO INTS(x, y) values(1, 1)"); + + return null; + } + }, SQLException.class, "Failed to parse query"); + + s.execute("INSERT INTO INTS(k, v) values(2, 2)"); + + assertEquals(2, grid(0).cache("ints").get(2)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index bc35264..13ad7e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -2565,7 +2565,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager List<IgniteBiTuple<Object, MvccVersion>> res = new ArrayList<>(); - long crd = MVCC_CRD_COUNTER_NA, cntr = MVCC_COUNTER_NA; int opCntr = MVCC_OP_COUNTER_NA; + long crd = MVCC_CRD_COUNTER_NA; + long cntr = MVCC_COUNTER_NA; + int opCntr = MVCC_OP_COUNTER_NA; while (cur.next()) { CacheDataRow row = cur.get(); @@ -2575,7 +2577,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager res.add(F.t(row.value(), row.mvccVersion())); - crd = row.mvccCoordinatorVersion(); cntr = row.mvccCounter(); opCntr = row.mvccOperationCounter(); + crd = row.mvccCoordinatorVersion(); + cntr = row.mvccCounter(); + opCntr = row.mvccOperationCounter(); } return res; @@ -2654,8 +2658,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return cursor(cacheId, null, null); } - /** {@inheritDoc} - * @param cacheId*/ + /** {@inheritDoc} */ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, MvccSnapshot mvccSnapshot) throws IgniteCheckedException { return cursor(cacheId, null, null, null, mvccSnapshot); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index 31d3b61..220f0c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -545,7 +545,7 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D } if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone()) - return null; + return null; else if (tx != null) return assignTxSnapshot(0L); else @@ -1324,7 +1324,9 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D * @param msg Message. */ private void processCoordinatorSnapshotResponse(UUID nodeId, MvccSnapshotResponse msg) { - Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.get(nodeId); MvccSnapshotResponseListener lsnr; + Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.get(nodeId); + + MvccSnapshotResponseListener lsnr; if (map != null && (lsnr = map.remove(msg.futureId())) != null) lsnr.onResponse(msg); @@ -2007,18 +2009,22 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D try { GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor(KEY_ONLY); - KeyCacheObject prevKey = null; Object rest = null; + KeyCacheObject prevKey = null; + + Object rest = null; List<MvccLinkAwareSearchRow> cleanupRows = null; MvccSnapshot snapshot = task.snapshot(); - GridCacheContext cctx = null; int curCacheId = CU.UNDEFINED_CACHE_ID; + GridCacheContext cctx = null; + + int curCacheId = CU.UNDEFINED_CACHE_ID; boolean shared = part.group().sharedGroup(); - if (!shared) - cctx = part.group().singleCacheContext(); + if (!shared && (cctx = F.first(part.group().caches())) == null) + return metrics; while (cursor.next()) { if (isCancelled()) @@ -2032,19 +2038,27 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D if (cctx == null) { assert shared; - curCacheId = row.cacheId(); - cctx = part.group().shared().cacheContext(curCacheId); + cctx = part.group().shared().cacheContext(curCacheId = row.cacheId()); + + if (cctx == null) + return metrics; } if (!prevKey.equals(row.key()) || (shared && curCacheId != row.cacheId())) { if (rest != null || !F.isEmpty(cleanupRows)) cleanup(part, prevKey, cleanupRows, rest, cctx, metrics); - cleanupRows = null; rest = null; + cleanupRows = null; - if (shared && curCacheId != row.cacheId()) + rest = null; + + if (shared && curCacheId != row.cacheId()) { cctx = part.group().shared().cacheContext(curCacheId = row.cacheId()); + if (cctx == null) + return metrics; + } + prevKey = row.key(); } @@ -2127,7 +2141,6 @@ class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, D } /** - * * @param part Local partition. * @param key Key. * @param cleanupRows Cleanup rows. http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index 33f457d..c75393e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -758,12 +758,7 @@ public class MvccUtils { if (tx == null) tracker = new MvccQueryTrackerImpl(cctx); else if ((tracker = tx.mvccQueryTracker()) == null) - tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx, tx)) { - @Override public void onDone() { - // TODO IGNITE-8841 - checkActive(tx); - } - }; + tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx, tx)); if (tracker.snapshot() == null) // TODO IGNITE-7388 @@ -780,7 +775,9 @@ public class MvccUtils { */ public static MvccSnapshot requestSnapshot(GridCacheContext cctx, GridNearTxLocal tx) throws IgniteCheckedException { - MvccSnapshot snapshot; tx = checkActive(tx); + MvccSnapshot snapshot; + + tx = checkActive(tx); if ((snapshot = tx.mvccSnapshot()) == null) { MvccProcessor prc = cctx.shared().coordinators(); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index f21a22f..07aea4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; @@ -540,8 +541,19 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { MvccQueryTracker mvccTracker = null; - if (cctx.mvccEnabled() && mvccSnapshot == null) - mvccSnapshot = (mvccTracker = MvccUtils.mvccTracker(cctx, false)).snapshot(); + if (cctx.mvccEnabled() && mvccSnapshot == null) { + GridNearTxLocal tx = cctx.tm().userTx(); + + if (tx != null) + mvccSnapshot = MvccUtils.requestSnapshot(cctx, tx); + else { + mvccTracker = MvccUtils.mvccTracker(cctx, null); + + mvccSnapshot = mvccTracker.snapshot(); + } + + assert mvccSnapshot != null; + } boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java index e51b7d0..b2cbe05 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java @@ -106,10 +106,6 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach } catch (ClusterTopologyException e) { info("Expected exception: " + e); - - assertNotNull(e.retryReadyFuture()); - - e.retryReadyFuture().get(); } catch (CacheException e) { info("Expected exception: " + e); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java index f5172c8..3ff8846 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java @@ -69,15 +69,22 @@ public abstract class CacheMvccAbstractFeatureTest extends CacheMvccAbstractTest @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); + cleanPersistenceDir(); + startGrids(4); node = grid(0); } /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<>(CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); @@ -93,8 +100,6 @@ public abstract class CacheMvccAbstractFeatureTest extends CacheMvccAbstractTest /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { node.destroyCache(CACHE_NAME); - - super.afterTest(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index 1abb45f..6e22f44 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -93,9 +93,11 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -868,6 +870,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { ReadMode readMode, WriteMode writeMode ) throws Exception { + if(readMode == SCAN && writeMode == PUT) + fail("https://issues.apache.org/jira/browse/IGNITE-7764"); + final int RANGE = 20; final int writers = 4; @@ -1052,6 +1057,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { ) throws Exception { + if(readMode == SCAN && writeMode == PUT) + fail("https://issues.apache.org/jira/browse/IGNITE-7764"); + final int TOTAL = 20; assert N <= TOTAL; @@ -1501,7 +1509,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { * * @throws Exception If failed. */ - private void verifyOldVersionsCleaned() throws Exception { + protected void verifyOldVersionsCleaned() throws Exception { runVacuumSync(); // Check versions. @@ -1531,8 +1539,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { if (!cctx.userCache() || !cctx.group().mvccEnabled()) continue; - for (Object e : cache.withKeepBinary()) { - IgniteBiTuple entry = (IgniteBiTuple)e; + for (Iterator it = cache.withKeepBinary().iterator(); it.hasNext(); ) { + IgniteBiTuple entry = (IgniteBiTuple)it.next(); KeyCacheObject key = cctx.toCacheKeyObject(entry.getKey()); @@ -1541,9 +1549,12 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { if (vers.size() > 1) { if (failIfNotCleaned) - fail("[key=" + key.value(null, false) + "; vers=" + vers + ']'); - else + fail("[key=" + key.value(null, false) + "; vers=" + vers + ']'); + else { + U.closeQuiet((AutoCloseable)it); + return false; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java index f9ab7bc..76a8604 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java @@ -74,25 +74,29 @@ public class CacheMvccClusterRestartTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { - cleanPersistenceDir(); - super.afterTestsStopped(); + + stopAllGrids(); + + cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - super.beforeTest(); + fail("https://issues.apache.org/jira/browse/IGNITE-9394"); cleanPersistenceDir(); + + super.beforeTest(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - cleanPersistenceDir(); + super.afterTest(); stopAllGrids(); - super.afterTest(); + cleanPersistenceDir(); } /** @@ -124,8 +128,6 @@ public class CacheMvccClusterRestartTest extends GridCommonAbstractTest { private void restart1(int srvBefore, int srvAfter) throws Exception { Ignite srv0 = startGridsMultiThreaded(srvBefore); - srv0.active(true); - IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration()); Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0)); @@ -141,8 +143,6 @@ public class CacheMvccClusterRestartTest extends GridCommonAbstractTest { srv0 = startGridsMultiThreaded(srvAfter); - srv0.active(true); - cache = srv0.cache(DEFAULT_CACHE_NAME); Map<Object, Object> res = cache.getAll(keys); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 01268de..f1519df 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -416,6 +417,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testTxReadIsolationSimple() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-7764"); + Ignite srv0 = startGrids(4); client = true; @@ -2263,8 +2266,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { checkActiveQueriesCleanup(ignite(0)); - verifyCoordinatorInternalState(); - try { fut.get(); } @@ -2466,10 +2467,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { } catch (ClusterTopologyException e) { info("Expected exception: " + e); - - assertNotNull(e.retryReadyFuture()); - - e.retryReadyFuture().get(); } return null; @@ -2817,7 +2814,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, 64)); - final int KEYS = 10_000; + final int KEYS = 1_000; Map<Object, Object> data = new HashMap<>(); @@ -2905,6 +2902,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testUpdate_N_Objects_ClientServer_Backups1_Scan() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-7764"); + int[] nValues = {3, 5, 10}; for (int n : nValues) { @@ -3262,7 +3261,9 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { CacheDataRow row = cctx.offheap().read(cctx, key0); - checkRow(cctx, row, key0, vers.get(0).get1()); + Object val = ((CacheObject)vers.get(0).get1()).value(cctx.cacheObjectContext(), false); + + checkRow(cctx, row, key0, val); for (IgniteBiTuple<Object, MvccVersion> ver : vers) { MvccVersion cntr = ver.get2(); @@ -3272,18 +3273,20 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { row = cctx.offheap().mvccRead(cctx, key0, readVer); - checkRow(cctx, row, key0, ver.get1()); + Object verVal = ((CacheObject)ver.get1()).value(cctx.cacheObjectContext(), false); + + checkRow(cctx, row, key0, verVal); } checkRow(cctx, cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion() + 1, 1)), key0, - vers.get(0).get1()); + val); checkRow(cctx, cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1)), key0, - vers.get(0).get1()); + val); MvccSnapshotResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000); @@ -3296,8 +3299,11 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { if (v == vers.size() - 1) assertNull(row); - else - checkRow(cctx, row, key0, vers.get(v + 1).get1()); + else { + Object nextVal = ((CacheObject)vers.get(v + 1).get1()).value(cctx.cacheObjectContext(), false); + + checkRow(cctx, row, key0, nextVal); + } } } @@ -3316,7 +3322,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testExpiration() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-7956"); + fail("https://issues.apache.org/jira/browse/IGNITE-7311"); + final IgniteEx node = startGrid(0); IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64)); @@ -3370,6 +3377,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testChangeExpireTime() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-7311"); + final IgniteEx node = startGrid(0); IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64)); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java new file mode 100644 index 0000000..c1af42b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * Check DataStreamer with Mvcc enabled. + */ +public class DataStreamProcessorMvccSelfTest extends DataStreamProcessorSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration igniteConfiguration = super.getConfiguration(igniteInstanceName); + + CacheConfiguration[] cacheConfigurations = igniteConfiguration.getCacheConfiguration(); + + assert cacheConfigurations == null || cacheConfigurations.length == 0 + || (cacheConfigurations.length == 1 && cacheConfigurations[0].getAtomicityMode() == TRANSACTIONAL); + + igniteConfiguration.setMvccEnabled(true); + + return igniteConfiguration; + } + + /** {@inheritDoc} */ + @Override public void testPartitioned() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8149"); + + super.testPartitioned(); + } + + /** {@inheritDoc} */ + @Override public void testColocated() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8149"); + + super.testColocated(); + } + + /** {@inheritDoc} */ + @Override public void testReplicated() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8149"); + + super.testReplicated(); + } + + /** {@inheritDoc} */ + @Override public void testUpdateStore() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8582"); + + super.testUpdateStore(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 55086f3..273456a 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -83,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; @@ -91,10 +90,6 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; -import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord; -import org.apache.ignite.internal.processors.cache.verify.PartitionKey; -import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTask; -import org.apache.ignite.internal.util.lang.GridAbsClosure; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -102,13 +97,10 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.internal.visor.VisorTaskArgument; -import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg; -import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskResult; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; @@ -128,7 +120,6 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java index b5275b9..f87b14d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithCo import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentTransactionTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest; -import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSeflTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest; /** * @@ -42,19 +42,25 @@ public class IgniteCacheMvccTestSuite extends TestSuite { public static TestSuite suite() { TestSuite suite = new TestSuite("IgniteCache MVCC Test Suite"); + // Basic tests. suite.addTestSuite(CacheMvccTransactionsTest.class); suite.addTestSuite(CacheMvccProcessorTest.class); - suite.addTestSuite(CacheMvccClusterRestartTest.class); + suite.addTestSuite(CacheMvccVacuumTest.class); suite.addTestSuite(CacheMvccConfigurationValidationTest.class); + + suite.addTestSuite(DataStreamProcessorMvccSelfTest.class); suite.addTestSuite(CacheMvccOperationChecksTest.class); + + // Concurrent ops tests. suite.addTestSuite(CacheMvccIteratorWithConcurrentTransactionTest.class); suite.addTestSuite(CacheMvccLocalEntriesWithConcurrentTransactionTest.class); suite.addTestSuite(CacheMvccScanQueryWithConcurrentTransactionTest.class); suite.addTestSuite(CacheMvccSizeWithConcurrentTransactionTest.class); - suite.addTestSuite(CacheMvccVacuumTest.class); + + // Failover tests. + suite.addTestSuite(CacheMvccClusterRestartTest.class); suite.addTestSuite(CacheMvccPartitionedCoordinatorFailoverTest.class); suite.addTestSuite(CacheMvccReplicatedCoordinatorFailoverTest.class); - suite.addTestSuite(DataStreamProcessorMvccSeflTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java index 05df754..e9f293c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java @@ -57,7 +57,7 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> { } /** {@inheritDoc} */ - @Override public void onClose() { + @Override public void onClose() throws IgniteCheckedException { try { super.onClose(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index e84ca04..814e83d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -26,7 +26,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.jdbc.JdbcResultSet; import org.h2.result.ResultInterface; import org.h2.value.Value; @@ -99,13 +98,13 @@ public abstract class H2ResultSetIterator<T> extends GridCloseableIteratorAdapte /** * @return {@code true} If next row was fetched successfully. */ - private boolean fetchNext() { + private boolean fetchNext() throws IgniteCheckedException { if (data == null) return false; try { - if (!data.next()){ - onClose(); + if (!data.next()) { + close(); return false; } @@ -138,7 +137,7 @@ public abstract class H2ResultSetIterator<T> extends GridCloseableIteratorAdapte } /** {@inheritDoc} */ - @Override public boolean onHasNext() { + @Override public boolean onHasNext() throws IgniteCheckedException { return hasRow || (hasRow = fetchNext()); } @@ -164,15 +163,21 @@ public abstract class H2ResultSetIterator<T> extends GridCloseableIteratorAdapte } /** {@inheritDoc} */ - @Override public void onClose(){ + @Override public void onClose() throws IgniteCheckedException { if (data == null) // Nothing to close. return; - U.closeQuiet(data); - - res = null; - data = null; + try { + data.close(); + } + catch (SQLException e) { + throw new IgniteSQLException(e); + } + finally { + res = null; + data = null; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsComandsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsComandsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsComandsSelfTest.java new file mode 100644 index 0000000..8b3fbe3 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsComandsSelfTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.util.concurrent.Callable; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * + */ +public class SqlTransactionsComandsSelfTest extends AbstractSchemaSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(commonConfiguration(0)); + + super.execute(grid(0), "CREATE TABLE INTS(k int primary key, v int) WITH \"wrap_value=false,cache_name=ints," + + "atomicity=transactional\""); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + + /** + * @throws Exception if failed. + */ + public void testBeginWithMvccDisabledThrows() throws Exception { + checkMvccDisabledBehavior("BEGIN"); + } + + /** + * @throws Exception if failed. + */ + public void testCommitWithMvccDisabledThrows() throws Exception { + checkMvccDisabledBehavior("COMMIT"); + } + + /** + * @throws Exception if failed. + */ + public void testRollbackWithMvccDisabledThrows() throws Exception { + checkMvccDisabledBehavior("rollback"); + } + + /** + * @param sql Operation to test. + * @throws Exception if failed. + */ + private void checkMvccDisabledBehavior(String sql) throws Exception { + try (IgniteEx node = startGrid(commonConfiguration(1))) { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + execute(node, sql); + + return null; + } + }, IgniteSQLException.class, "MVCC must be enabled in order to invoke transactional operation: " + sql); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java new file mode 100644 index 0000000..55dd59a --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionState; + +/** + * Tests to check behavior regarding transactions started via SQL. + */ +public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSchemaSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(commonConfiguration(0).setMvccEnabled(true)); + + super.execute(node(), "CREATE TABLE INTS(k int primary key, v int) WITH \"wrap_value=false,cache_name=ints," + + "atomicity=transactional\""); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * Test that BEGIN opens a transaction. + */ + public void testBegin() { + execute(node(), "BEGIN"); + + assertTxPresent(); + + assertTxState(tx(), TransactionState.ACTIVE); + } + + /** + * Test that COMMIT commits a transaction. + */ + public void testCommit() { + execute(node(), "BEGIN WORK"); + + assertTxPresent(); + + Transaction tx = tx(); + + assertTxState(tx, TransactionState.ACTIVE); + + execute(node(), "COMMIT TRANSACTION"); + + assertTxState(tx, TransactionState.COMMITTED); + + assertSqlTxNotPresent(); + } + + /** + * Test that COMMIT without a transaction yields nothing. + */ + public void testCommitNoTransaction() { + execute(node(), "COMMIT"); + } + + /** + * Test that ROLLBACK without a transaction yields nothing. + */ + public void testRollbackNoTransaction() { + execute(node(), "ROLLBACK"); + } + + /** + * Test that ROLLBACK rolls back a transaction. + */ + public void testRollback() { + execute(node(), "BEGIN TRANSACTION"); + + assertTxPresent(); + + Transaction tx = tx(); + + assertTxState(tx, TransactionState.ACTIVE); + + execute(node(), "ROLLBACK TRANSACTION"); + + assertTxState(tx, TransactionState.ROLLED_BACK); + + assertSqlTxNotPresent(); + } + + /** + * Test that attempting to perform various SQL operations within non SQL transaction yields an exception. + */ + public void testSqlOperationsWithinNonSqlTransaction() { + assertSqlOperationWithinNonSqlTransactionThrows("COMMIT"); + + assertSqlOperationWithinNonSqlTransactionThrows("ROLLBACK"); + + assertSqlOperationWithinNonSqlTransactionThrows("SELECT * from ints"); + + assertSqlOperationWithinNonSqlTransactionThrows("DELETE from ints"); + + assertSqlOperationWithinNonSqlTransactionThrows("INSERT INTO ints(k, v) values(10, 15)"); + + assertSqlOperationWithinNonSqlTransactionThrows("MERGE INTO ints(k, v) values(10, 15)"); + + assertSqlOperationWithinNonSqlTransactionThrows("UPDATE ints SET v = 100 WHERE k = 5"); + + assertSqlOperationWithinNonSqlTransactionThrows("create index idx on ints(v)"); + + assertSqlOperationWithinNonSqlTransactionThrows("CREATE TABLE T(k int primary key, v int)"); + } + + /** + * Check that trying to run given SQL statement both locally and in distributed mode yields an exception + * if transaction already has been marked as being of SQL type. + * @param sql SQL statement. + */ + private void assertSqlOperationWithinNonSqlTransactionThrows(final String sql) { + try (Transaction ignored = node().transactions().txStart()) { + node().cache("ints").put(1, 1); + + assertSqlException(new RunnableX() { + @Override public void run() throws Exception { + execute(node(), sql); + } + }, IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH); + } + + try (Transaction ignored = node().transactions().txStart()) { + node().cache("ints").put(1, 1); + + assertSqlException(new RunnableX() { + @Override public void run() throws Exception { + node().cache("ints").query(new SqlFieldsQuery(sql).setLocal(true)).getAll(); + } + }, IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH); + } + } + + /** + * Test that attempting to perform a cache API operation from within an SQL transaction fails. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void checkCacheOperationThrows(final String opName, final Object... args) { + execute(node(), "BEGIN"); + + try { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + try { + // We need to detect types based on arguments due to multiple overloads. + Class[] types; + + if (F.isEmpty(args)) + types = (Class[]) X.EMPTY_OBJECT_ARRAY; + else { + types = new Class[args.length]; + + for (int i = 0; i < args.length; i++) + types[i] = argTypeForObject(args[i]); + } + + Object res = U.invoke(GatewayProtectedCacheProxy.class, node().cache("ints"), + opName, types, args); + + if (opName.endsWith("Async")) + ((IgniteFuture)res).get(); + } + catch (IgniteCheckedException e) { + if (e.getCause() != null) { + try { + if (e.getCause().getCause() != null) + throw (Exception)e.getCause().getCause(); + else + fail(); + } + catch (IgniteException e1) { + // Some public API methods don't have IgniteCheckedException on their signature + // and thus may wrap it into an IgniteException. + if (e1.getCause() != null) + throw (Exception)e1.getCause(); + else + fail(); + } + } + else + fail(); + } + + return null; + } + }, IgniteCheckedException.class, + "SQL queries and cache operations may not be used in the same transaction."); + } + finally { + try { + execute(node(), "ROLLBACK"); + } + catch (Throwable e) { + // No-op. + } + } + } + + /** + * + */ + private static Class<?> argTypeForObject(Object arg) { + if (arg instanceof Set) + return Set.class; + else if (arg instanceof Map) + return Map.class; + else if (arg.getClass().getName().startsWith("java.lang.")) + return Object.class; + else if (arg instanceof CacheEntryProcessor) + return CacheEntryProcessor.class; + else if (arg instanceof EntryProcessor) + return EntryProcessor.class; + else + return arg.getClass(); + } + + /** + * Test that attempting to perform a cache PUT operation from within an SQL transaction fails. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCacheOperationsFromSqlTransaction() { + checkCacheOperationThrows("get", 1); + + checkCacheOperationThrows("getAsync", 1); + + checkCacheOperationThrows("getEntry", 1); + + checkCacheOperationThrows("getEntryAsync", 1); + + checkCacheOperationThrows("getAndPut", 1, 1); + + checkCacheOperationThrows("getAndPutAsync", 1, 1); + + checkCacheOperationThrows("getAndPutIfAbsent", 1, 1); + + checkCacheOperationThrows("getAndPutIfAbsentAsync", 1, 1); + + checkCacheOperationThrows("getAndReplace", 1, 1); + + checkCacheOperationThrows("getAndReplaceAsync", 1, 1); + + checkCacheOperationThrows("getAndRemove", 1); + + checkCacheOperationThrows("getAndRemoveAsync", 1); + + checkCacheOperationThrows("containsKey", 1); + + checkCacheOperationThrows("containsKeyAsync", 1); + + checkCacheOperationThrows("put", 1, 1); + + checkCacheOperationThrows("putAsync", 1, 1); + + checkCacheOperationThrows("putIfAbsent", 1, 1); + + checkCacheOperationThrows("putIfAbsentAsync", 1, 1); + + checkCacheOperationThrows("remove", 1); + + checkCacheOperationThrows("removeAsync", 1); + + checkCacheOperationThrows("remove", 1, 1); + + checkCacheOperationThrows("removeAsync", 1, 1); + + checkCacheOperationThrows("replace", 1, 1); + + checkCacheOperationThrows("replaceAsync", 1, 1); + + checkCacheOperationThrows("replace", 1, 1, 1); + + checkCacheOperationThrows("replaceAsync", 1, 1, 1); + + checkCacheOperationThrows("getAll", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("containsKeys", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("getEntries", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("putAll", Collections.singletonMap(1, 1)); + + checkCacheOperationThrows("removeAll", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("getAllAsync", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("containsKeysAsync", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("getEntriesAsync", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("putAllAsync", Collections.singletonMap(1, 1)); + + checkCacheOperationThrows("removeAllAsync", new HashSet<>(Arrays.asList(1, 2))); + + checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAsync", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAsync", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAll", Collections.singletonMap(1, CACHE_ENTRY_PROC), X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAll", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAll", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAllAsync", Collections.singletonMap(1, CACHE_ENTRY_PROC), + X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + + checkCacheOperationThrows("invokeAllAsync", Collections.singleton(1), ENTRY_PROC, X.EMPTY_OBJECT_ARRAY); + } + + /** */ + private final static EntryProcessor<Integer, Integer, Object> ENTRY_PROC = + new EntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments) + throws EntryProcessorException { + return null; + } + }; + + /** */ + private final static CacheEntryProcessor<Integer, Integer, Object> CACHE_ENTRY_PROC = + new CacheEntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments) + throws EntryProcessorException { + return null; + } + }; + + /** + * @return Node. + */ + private IgniteEx node() { + return grid(0); + } + + /** + * @return Currently open transaction. + */ + private Transaction tx() { + return node().transactions().tx(); + } + + /** + * Check that there's an open transaction with SQL flag. + */ + private void assertTxPresent() { + assertNotNull(tx()); + } + + /** {@inheritDoc} */ + @Override protected List<List<?>> execute(Ignite node, String sql) { + return node.cache("ints").query(new SqlFieldsQuery(sql).setSchema(QueryUtils.DFLT_SCHEMA)).getAll(); + } + + /** + * Check that there's no open transaction. + */ + private void assertSqlTxNotPresent() { + assertNull(tx()); + } + + /** + * Check transaction state. + */ + private static void assertTxState(Transaction tx, TransactionState state) { + assertEquals(state, tx.state()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java index 71e004b..998cb76 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java @@ -71,7 +71,7 @@ public abstract class CacheMvccBackupsAbstractTest extends CacheMvccAbstractTest public void testBackupsCoherenceSimple() throws Exception { disableScheduledVacuum = true; - ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 10) .setIndexedTypes(Integer.class, Integer.class); final int KEYS_CNT = 5_000; @@ -184,10 +184,10 @@ public abstract class CacheMvccBackupsAbstractTest extends CacheMvccAbstractTest public void testBackupsCoherenceWithLargeOperations() throws Exception { disableScheduledVacuum = true; - ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, DFLT_PARTITION_COUNT) + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, 10) .setIndexedTypes(Integer.class, Integer.class); - final int KEYS_CNT = 50_000; + final int KEYS_CNT = 5_000; assert KEYS_CNT % 2 == 0; startGrids(2); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java index 4ad667b9..9e0b02f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java @@ -629,6 +629,8 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT * @throws Exception If failed. */ public void testDistributedJoinSimple() throws Exception { + disableScheduledVacuum = true; //TODO: IGNITE-9446: remove this after races in vacuum will be fixed. + startGridsMultiThreaded(4); Ignite srv0 = ignite(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/254a6529/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java index ade3763..69cf108 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java @@ -634,7 +634,7 @@ public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache barrier.await(); String sqlText = "UPDATE MvccTestSqlIndexValue t SET idxVal1=" + - "(SELECT _val FROM \"int\".Integer WHERE t._key = _key ORDER BY _key)"; + "(SELECT _val FROM \"int\".Integer WHERE _key >= 5 AND _key <= 5 ORDER BY _key) WHERE _key = 5"; qry = new SqlFieldsQuery(sqlText);