http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java new file mode 100644 index 0000000..ade3763 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java @@ -0,0 +1,829 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests for transactional SQL. + */ +public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends CacheMvccAbstractTest { + /** */ + private static final int TIMEOUT = 3000; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + ccfgs = null; + ccfg = null; + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerInsert() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = + checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap( + 1,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), + 2,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), + 3,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3))); + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key + 3, idxVal1 + 3 FROM MvccTestSqlIndexValue"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + qry.setDistributedJoins(true); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + tx.commit(); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(4), cache.get(4)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(5), cache.get(5)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(6), cache.get(6)); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerInsertDuplicateKey() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = + checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap( + 1,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), + 2,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), + 3,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3))); + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key, idxVal1 FROM MvccTestSqlIndexValue"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + qry.setDistributedJoins(true); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() { + @Override public Object call() { + return cache0.query(qry); + } + }, IgniteSQLException.class, "Duplicate key"); + + tx.rollback(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerMerge() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = + checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap( + 1,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), + 2,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), + 3,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3))); + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "MERGE INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key * 2, idxVal1 FROM MvccTestSqlIndexValue"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + qry.setDistributedJoins(true); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + tx.commit(); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(4)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(6)); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerMultiBatchPerNodeServer() throws Exception { + checkMultiBatchPerNode(false); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerMultiBatchPerNodeClient() throws Exception { + checkMultiBatchPerNode(true); + } + + /** + * @throws Exception If failed. + */ + private void checkMultiBatchPerNode(boolean client) throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + Ignite checkNode; + Ignite updateNode; + + Random rnd = ThreadLocalRandom.current(); + + if (client) { + startGridsMultiThreaded(3); + + updateNode = grid(rnd.nextInt(3)); + + this.client = true; + + checkNode = startGrid(4); + } + else { + startGridsMultiThreaded(4); + + checkNode = grid(rnd.nextInt(4)); + updateNode = grid(rnd.nextInt(4)); + } + + IgniteCache<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = + checkNode.cache(DEFAULT_CACHE_NAME); + + final int count = 6; + + Map<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> vals = new HashMap<>(count); + + for (int idx = 1; idx <= count; ++idx) + vals.put(idx, new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(idx)); + + cache.putAll(vals); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key + 6, idxVal1 + 6 FROM MvccTestSqlIndexValue"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + qry.setDistributedJoins(true); + qry.setPageSize(1); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals((long)count, cur.iterator().next().get(0)); + } + + tx.commit(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerDelete() throws Exception { + ccfgs = new CacheConfiguration[] { + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setName("int") + .setIndexedTypes(Integer.class, Integer.class), + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, + CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class), + }; + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, Integer> cache = checkNode.cache("int"); + + cache.putAll(F.asMap(1, 1, 3, 3, 5, 5)); + + final int count = 6; + + Map<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> vals = new HashMap<>(count); + + for (int idx = 1; idx <= count; ++idx) + vals.put(idx, new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(idx)); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + cache0.putAll(vals); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "DELETE FROM MvccTestSqlIndexValue t " + + "WHERE EXISTS (SELECT 1 FROM \"int\".Integer WHERE t._key = _key)"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + tx.commit(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerUpdate() throws Exception { + ccfgs = new CacheConfiguration[] { + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setName("int") + .setIndexedTypes(Integer.class, Integer.class), + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, + CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class), + }; + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, Integer> cache = checkNode.cache("int"); + + cache.putAll(F.asMap(1, 5, 3, 1, 5, 3)); + + final int count = 6; + + Map<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> vals = new HashMap<>(count); + + for (int idx = 1; idx <= count; ++idx) + vals.put(idx, new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(idx)); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + cache0.putAll(vals); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "UPDATE MvccTestSqlIndexValue t SET idxVal1=" + + "(SELECT _val FROM \"int\".Integer WHERE t._key = _key)" + + " WHERE EXISTS (SELECT 1 FROM \"int\".Integer WHERE t._key = _key)"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + tx.commit(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerImplicitTxInsert() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = + checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap( + 1,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), + 2,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), + 3,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3))); + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key + 3, idxVal1 + 3 FROM MvccTestSqlIndexValue"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + qry.setTimeout(TX_TIMEOUT, TimeUnit.MILLISECONDS); + + qry.setDistributedJoins(true); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(4), cache.get(4)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(5), cache.get(5)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(6), cache.get(6)); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerRollbackInsert() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Integer, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = + checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap( + 1,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), + 2,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), + 3,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3))); + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + try (Transaction tx = updateNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key + 3, idxVal1 + 3 FROM MvccTestSqlIndexValue"; + + SqlFieldsQuery qry = new SqlFieldsQuery(sqlText); + + qry.setDistributedJoins(true); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + tx.rollback(); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), sqlGet(1, cache).get(0).get(0)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), sqlGet(2, cache).get(0).get(0)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), sqlGet(3, cache).get(0).get(0)); + assertTrue(sqlGet(4, cache).isEmpty()); + assertTrue(sqlGet(5, cache).isEmpty()); + assertTrue(sqlGet(6, cache).isEmpty()); + } + + /** + * @param key Key. + * @param cache Cache. + * @return Result. + */ + private List<List> sqlGet(int key, IgniteCache cache) { + return cache.query(new SqlFieldsQuery("SELECT _val from MvccTestSqlIndexValue WHERE _key=" + key)).getAll(); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerDeadlockInsert() throws Exception { + ccfgs = new CacheConfiguration[] { + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setName("int") + .setIndexedTypes(Integer.class, Integer.class), + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, + CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class), + }; + + startGridsMultiThreaded(2); + + client = true; + + startGridsMultiThreaded(2, 2); + + Ignite checkNode = grid(2); + + IgniteCache<Integer, Integer> cache = checkNode.cache("int"); + + HashMap<Integer, Integer> vals = new HashMap<>(100); + + for (int idx = 0; idx < 100; ++idx) + vals.put(idx, idx); + + cache.putAll(vals); + + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicInteger idx = new AtomicInteger(2); + final AtomicReference<Exception> ex = new AtomicReference<>(); + + multithreaded(new Runnable() { + @Override public void run() { + int id = idx.getAndIncrement(); + + IgniteEx node = grid(id); + + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TIMEOUT); + + String sqlText = "INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT DISTINCT _key, _val FROM \"int\".Integer ORDER BY _key"; + + String sqlAsc = sqlText + " ASC"; + String sqlDesc = sqlText + " DESC"; + + SqlFieldsQuery qry = new SqlFieldsQuery((id % 2) == 0 ? sqlAsc : sqlDesc); + + IgniteCache<Object, Object> cache0 = node.cache(DEFAULT_CACHE_NAME); + + cache0.query(qry).getAll(); + + barrier.await(); + + qry = new SqlFieldsQuery((id % 2) == 0 ? sqlDesc : sqlAsc); + + cache0.query(qry).getAll(); + + tx.commit(); + } + } + catch (Exception e) { + onException(ex, e); + } + } + }, 2); + + Exception ex0 = ex.get(); + + assertNotNull(ex0); + + if (!X.hasCause(ex0, IgniteTxTimeoutCheckedException.class)) + throw ex0; + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerInsertVersionConflict() throws Exception { + ccfgs = new CacheConfiguration[] { + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setName("int") + .setIndexedTypes(Integer.class, Integer.class), + cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, + CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class), + }; + + startGridsMultiThreaded(2); + + client = true; + + final Ignite checkNode = startGrid(2); + + IgniteCache<Integer, Integer> cache = checkNode.cache("int"); + + HashMap<Integer, Integer> vals = new HashMap<>(100); + + for (int idx = 0; idx < 10; ++idx) + vals.put(idx, idx); + + cache.putAll(vals); + + awaitPartitionMapExchange(); + + IgniteCache cache0 = checkNode.cache(DEFAULT_CACHE_NAME); + + cache0.query(new SqlFieldsQuery("INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) " + + "SELECT _key, _val FROM \"int\".Integer")).getAll(); + + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicReference<Exception> ex = new AtomicReference<>(); + + runMultiThreaded(new Runnable() { + @Override public void run() { + try { + try (Transaction tx = checkNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + tx.timeout(TX_TIMEOUT); + + barrier.await(); + + SqlFieldsQuery qry = new SqlFieldsQuery("SELECT * FROM MvccTestSqlIndexValue"); + + cache0.query(qry).getAll(); + + barrier.await(); + + String sqlText = "UPDATE MvccTestSqlIndexValue t SET idxVal1=" + + "(SELECT _val FROM \"int\".Integer WHERE t._key = _key ORDER BY _key)"; + + qry = new SqlFieldsQuery(sqlText); + + cache0.query(qry).getAll(); + + tx.commit(); + } + } + catch (Exception e) { + onException(ex, e); + } + } + }, 2, "tx-thread"); + + IgniteSQLException ex0 = X.cause(ex.get(), IgniteSQLException.class); + + assertNotNull("Exception has not been thrown.", ex0); + assertEquals("Mvcc version mismatch.", ex0.getMessage()); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerInsertValues() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite node = grid(rnd.nextInt(4)); + + IgniteCache<Object, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = node.cache(DEFAULT_CACHE_NAME); + + SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO MvccTestSqlIndexValue (_key, idxVal1)" + + " values (1,?),(2,?),(3,?)"); + + qry.setArgs(1, 2, 3); + + try (FieldsQueryCursor<List<?>> cur = cache.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + qry = new SqlFieldsQuery("INSERT INTO MvccTestSqlIndexValue (_key, idxVal1) values (4,4)"); + + try (FieldsQueryCursor<List<?>> cur = cache.query(qry)) { + assertEquals(1L, cur.iterator().next().get(0)); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(4), cache.get(4)); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerMergeValues() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite node = grid(rnd.nextInt(4)); + + IgniteCache<Object, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue> cache = node.cache(DEFAULT_CACHE_NAME); + + cache.put(1, new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1)); + cache.put(3, new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3)); + + SqlFieldsQuery qry = new SqlFieldsQuery("MERGE INTO MvccTestSqlIndexValue (_key, idxVal1)" + + " values (1,?),(2,?),(3,?)"); + + qry.setArgs(1, 4, 6); + + try (FieldsQueryCursor<List<?>> cur = cache.query(qry)) { + assertEquals(3L, cur.iterator().next().get(0)); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(4), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(6), cache.get(3)); + + qry = new SqlFieldsQuery("MERGE INTO MvccTestSqlIndexValue (_key, idxVal1) values (4,4)"); + + try (FieldsQueryCursor<List<?>> cur = cache.query(qry)) { + assertEquals(1L, cur.iterator().next().get(0)); + } + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(4), cache.get(4)); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerFastUpdate() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, Integer.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Object, Object> cache = checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap(1,1,2,2,3,3)); + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + assertEquals(3, cache.get(3)); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val = 8 WHERE _key = ?").setArgs(1); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(1L, cur.iterator().next().get(0)); + } + + qry = new SqlFieldsQuery("UPDATE Integer SET _val = 9 WHERE _key = 2"); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(1L, cur.iterator().next().get(0)); + } + + assertEquals(8, cache.get(1)); + assertEquals(9, cache.get(2)); + assertEquals(3, cache.get(3)); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReducerFastDelete() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class); + + startGridsMultiThreaded(4); + + Random rnd = ThreadLocalRandom.current(); + + Ignite checkNode = grid(rnd.nextInt(4)); + Ignite updateNode = grid(rnd.nextInt(4)); + + IgniteCache<Object, Object> cache = checkNode.cache(DEFAULT_CACHE_NAME); + + cache.putAll(F.asMap( + 1,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), + 2,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), + 3,new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3))); + + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(1), cache.get(1)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(2), cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + + IgniteCache<Object, Object> cache0 = updateNode.cache(DEFAULT_CACHE_NAME); + + SqlFieldsQuery qry = new SqlFieldsQuery("DELETE FROM MvccTestSqlIndexValue WHERE _key = ?") + .setArgs(1); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(1L, cur.iterator().next().get(0)); + } + + qry = new SqlFieldsQuery("DELETE FROM MvccTestSqlIndexValue WHERE _key = 2"); + + try (FieldsQueryCursor<List<?>> cur = cache0.query(qry)) { + assertEquals(1L, cur.iterator().next().get(0)); + } + + assertNull(cache.get(1)); + assertNull(cache.get(2)); + assertEquals(new CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue(3), cache.get(3)); + } + + /** + * @param ex Exception holder. + * @param e Exception. + */ + private void onException(AtomicReference<Exception> ex, Exception e) { + if (!ex.compareAndSet(null, e)) + ex.get().addSuppressed(e); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccStreamingInsertTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccStreamingInsertTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccStreamingInsertTest.java new file mode 100644 index 0000000..7d2e335 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccStreamingInsertTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteJdbcDriver; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; + +import static java.util.Arrays.asList; + +/** + * + */ +public class CacheMvccStreamingInsertTest extends CacheMvccAbstractTest { + /** */ + private IgniteCache<Object, Object> sqlNexus; + + /** */ + private Connection conn; + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + Ignite ignite = startGrid(0); + sqlNexus = ignite.getOrCreateCache(new CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC")); + sqlNexus.query(q("" + + "create table person(" + + " id int not null primary key," + + " name varchar not null" + + ") with \"atomicity=transactional\"" + )); + + Properties props = new Properties(); + props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true"); + conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1", props); + } + + /** + * @throws Exception If failed. + */ + public void testStreamingInsertWithoutOverwrite() throws Exception { + conn.createStatement().execute("SET STREAMING 1 BATCH_SIZE 2 ALLOW_OVERWRITE 0 " + + " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 100"); + sqlNexus.query(q("insert into person values(1, 'ivan')")); + + PreparedStatement batchStmt = conn.prepareStatement("insert into person values(?, ?)"); + batchStmt.setInt(1, 1); + batchStmt.setString(2, "foo"); + batchStmt.addBatch(); + batchStmt.setInt(1, 2); + batchStmt.setString(2, "bar"); + batchStmt.addBatch(); + TimeUnit.MILLISECONDS.sleep(500); + + List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll(); + List<List<?>> exp = asList( + asList(1, "ivan"), + asList(2, "bar") + ); + assertEquals(exp, rows); + } + + /** + * @throws Exception If failed. + */ + public void testUpdateWithOverwrite() throws Exception { + conn.createStatement().execute("SET STREAMING 1 BATCH_SIZE 2 ALLOW_OVERWRITE 1 " + + " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 100"); + sqlNexus.query(q("insert into person values(1, 'ivan')")); + + PreparedStatement batchStmt = conn.prepareStatement("insert into person values(?, ?)"); + batchStmt.setInt(1, 1); + batchStmt.setString(2, "foo"); + batchStmt.addBatch(); + batchStmt.setInt(1, 2); + batchStmt.setString(2, "bar"); + batchStmt.addBatch(); + TimeUnit.MILLISECONDS.sleep(500); + + List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll(); + List<List<?>> exp = asList( + asList(1, "foo"), + asList(2, "bar") + ); + assertEquals(exp, rows); + } + + /** */ + private static SqlFieldsQuery q(String sql) { + return new SqlFieldsQuery(sql); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java new file mode 100644 index 0000000..57cee61 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.index.DynamicIndexAbstractSelfTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +/** + * Index rebuild after node restart test. + */ +public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest { + /** Data size. */ + private final static int AMOUNT = 10; + + /** Data size. */ + private final static String CACHE_NAME = "T"; + + /** Test instance to allow interaction with static context. */ + private static GridIndexRebuildSelfTest INSTANCE; + + /** Latch to signal that rebuild may start. */ + private final CountDownLatch rebuildLatch = new CountDownLatch(1); + + /** Latch to signal that concurrent put may start. */ + private final Semaphore rebuildSemaphore = new Semaphore(1, true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception { + IgniteConfiguration cfg = super.commonConfiguration(idx); + + cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Just in case. + cleanPersistenceDir(); + + INSTANCE = this; + } + + /** + * Do test with MVCC enabled. + */ + public void testMvccEnabled() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-7259"); + doTest(true); + } + + /** + * Do test with MVCC disabled. + */ + public void testMvccDisabled() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-7259"); + doTest(false); + } + + /** + * Do test.<p> + * Steps are as follows: + * <ul> + * <li>Put some data;</li> + * <li>Stop the node;</li> + * <li>Remove index file;</li> + * <li>Restart the node and block index rebuild;</li> + * <li>For half of the keys do cache puts <b>before</b> corresponding key + * has been processed during index rebuild;</li> + * <li>Check that: + * <ul> + * <li>For MVCC case: some keys have all versions that existed before restart, while those + * updated concurrently have only put version (one with mark value -1) + * and latest version present before node restart;</li> + * <li>For non MVCC case: keys updated concurrently must have mark values of -1 despite that + * index rebuild for them has happened after put.</li> + * </ul> + * </li> + * </ul> + * @param mvccEnabled MVCC flag. + * @throws Exception if failed. + */ + private void doTest(boolean mvccEnabled) throws Exception { + IgniteEx srv = startServer(mvccEnabled); + + execute(srv, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=T,wrap_value=false," + + "atomicity=transactional\""); + + execute(srv, "CREATE INDEX IDX ON T(v)"); + + IgniteInternalCache cc = srv.cachex(CACHE_NAME); + + assertNotNull(cc); + + if (mvccEnabled) + lockVersion(srv); + + putData(srv, false); + + checkDataState(srv, mvccEnabled, false); + + File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration()); + + File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile(); + + stopAllGrids(); + + assertTrue(U.delete(idxPath)); + + srv = startServer(mvccEnabled); + + putData(srv, true); + + checkDataState(srv, mvccEnabled, true); + } + + /** + * Check versions presence in index tree. + * @param srv Node. + * @param mvccEnabled MVCC flag. + * @param afterRebuild Whether index rebuild has occurred. + * @throws IgniteCheckedException if failed. + */ + @SuppressWarnings({"ConstantConditions", "unchecked"}) + private void checkDataState(IgniteEx srv, boolean mvccEnabled, boolean afterRebuild) throws IgniteCheckedException { + IgniteInternalCache icache = srv.cachex(CACHE_NAME); + + IgniteCache cache = srv.cache(CACHE_NAME); + + assertNotNull(icache); + + for (IgniteCacheOffheapManager.CacheDataStore store : icache.context().offheap().cacheDataStores()) { + GridCursor<? extends CacheDataRow> cur = store.cursor(); + + while (cur.next()) { + CacheDataRow row = cur.get(); + + int key = row.key().value(icache.context().cacheObjectContext(), false); + + if (mvccEnabled) { + List<IgniteBiTuple<Object, MvccVersion>> vers = store.mvccFindAllVersions(icache.context(), row.key()); + + if (!afterRebuild || key <= AMOUNT / 2) + assertEquals(key, vers.size()); + else { + // For keys affected by concurrent put there are two versions - + // -1 (concurrent put mark) and newest restored value as long as put cleans obsolete versions. + assertEquals(2, vers.size()); + + assertEquals(-1, vers.get(0).getKey()); + assertEquals(key, vers.get(1).getKey()); + } + } + else { + if (!afterRebuild || key <= AMOUNT / 2) + assertEquals(key, cache.get(key)); + else + assertEquals(-1, cache.get(key)); + } + } + } + } + + /** + * Lock coordinator version in order to keep MVCC versions in place. + * @param node Node. + * @throws IgniteCheckedException if failed. + */ + private static void lockVersion(IgniteEx node) throws IgniteCheckedException { + node.context().coordinators().requestSnapshotAsync().get(); + } + + /** + * Put data to cache. + * @param node Node. + * @throws Exception if failed. + */ + private void putData(Ignite node, final boolean forConcurrentPut) throws Exception { + final IgniteCache<Integer, Integer> cache = node.cache(CACHE_NAME); + + assertNotNull(cache); + + for (int i = 1; i <= AMOUNT; i++) { + if (forConcurrentPut) { + // Concurrent put affects only second half of the keys. + if (i <= AMOUNT / 2) + continue; + + rebuildSemaphore.acquire(); + + cache.put(i, -1); + + rebuildLatch.countDown(); + + rebuildSemaphore.release(); + } + else { + // Data streamer is not used intentionally in order to preserve all versions. + for (int j = 1; j <= i; j++) + cache.put(i, j); + } + } + } + + /** + * Start server node. + * @param mvccEnabled MVCC flag. + * @return Started node. + * @throws Exception if failed. + */ + private IgniteEx startServer(boolean mvccEnabled) throws Exception { + // Have to do this for each starting node - see GridQueryProcessor ctor, it nulls + // idxCls static field on each call. + GridQueryProcessor.idxCls = BlockingIndexing.class; + + IgniteConfiguration cfg = serverConfiguration(0).setMvccEnabled(mvccEnabled); + + IgniteEx res = startGrid(cfg); + + res.active(true); + + return res; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + cleanPersistenceDir(); + } + + /** + * Blocking indexing processor. + */ + private static class BlockingIndexing extends IgniteH2Indexing { + /** Flag to ignore first rebuild performed on initial node start. */ + private boolean firstRbld = true; + + /** {@inheritDoc} */ + @Override public void rebuildIndexesFromHash(String cacheName) throws IgniteCheckedException { + if (!firstRbld) + U.await(INSTANCE.rebuildLatch); + else + firstRbld = false; + + int cacheId = CU.cacheId(cacheName); + + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); + + final GridCacheQueryManager qryMgr = cctx.queries(); + + SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx); + + visitor.visit(new TestRebuildClosure(qryMgr, cctx.mvccEnabled())); + + for (H2TableDescriptor tblDesc : tables(cacheName)) + tblDesc.table().markRebuildFromHashInProgress(false); + } + } + + /** + * Test closure. + */ + private final static class TestRebuildClosure extends RebuildIndexFromHashClosure { + /** Seen keys set to track moment when concurrent put may start. */ + private final Set<KeyCacheObject> keys = + Collections.newSetFromMap(new ConcurrentHashMap<KeyCacheObject, Boolean>()); + + /** + * @param qryMgr Query manager. + * @param mvccEnabled MVCC status flag. + */ + TestRebuildClosure(GridCacheQueryManager qryMgr, boolean mvccEnabled) { + super(qryMgr, mvccEnabled); + } + + /** {@inheritDoc} */ + @Override public synchronized void apply(CacheDataRow row) throws IgniteCheckedException { + // For half of the keys, we want to do rebuild + // after corresponding key had been put from a concurrent thread. + boolean keyFirstMet = keys.add(row.key()) && keys.size() > AMOUNT / 2; + + if (keyFirstMet) { + try { + INSTANCE.rebuildSemaphore.acquire(); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + } + + super.apply(row); + + if (keyFirstMet) + INSTANCE.rebuildSemaphore.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 235b28b..6b76230 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -343,7 +343,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract // Fields query GridQueryFieldsResult fieldsRes = spi.queryLocalSqlFields(spi.schema("A"), "select a.a.name n1, a.a.age a1, b.a.name n2, " + - "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null); + "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, false, 0, null); String[] aliases = {"N1", "A1", "N2", "A2"}; Object[] vals = { "Valera", 19, "Kolya", 25}; @@ -401,7 +401,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract range *= 3; GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1, - range), null, false, 0, null); + range), null, false, false, 0, null); assert res.iterator().hasNext(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java new file mode 100644 index 0000000..655d039 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/H2StatementCacheSelfTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.sql.PreparedStatement; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class H2StatementCacheSelfTest extends GridCommonAbstractTest { + + /** + * @throws Exception If failed. + */ + public void testEviction() throws Exception { + H2StatementCache stmtCache = new H2StatementCache(1); + H2CachedStatementKey key1 = new H2CachedStatementKey("", "1"); + PreparedStatement stmt1 = stmt(); + stmtCache.put(key1, stmt1); + + assertSame(stmt1, stmtCache.get(key1)); + + stmtCache.put(new H2CachedStatementKey("mydb", "2"), stmt()); + + assertNull(stmtCache.get(key1)); + } + + /** + * @throws Exception If failed. + */ + public void testLruEvictionInStoreOrder() throws Exception { + H2StatementCache stmtCache = new H2StatementCache(2); + + H2CachedStatementKey key1 = new H2CachedStatementKey("", "1"); + H2CachedStatementKey key2 = new H2CachedStatementKey("", "2"); + stmtCache.put(key1, stmt()); + stmtCache.put(key2, stmt()); + + stmtCache.put(new H2CachedStatementKey("", "3"), stmt()); + + assertNull(stmtCache.get(key1)); + } + + /** + * @throws Exception If failed. + */ + public void testLruEvictionInAccessOrder() throws Exception { + H2StatementCache stmtCache = new H2StatementCache(2); + + H2CachedStatementKey key1 = new H2CachedStatementKey("", "1"); + H2CachedStatementKey key2 = new H2CachedStatementKey("", "2"); + stmtCache.put(key1, stmt()); + stmtCache.put(key2, stmt()); + stmtCache.get(key1); + + stmtCache.put(new H2CachedStatementKey("", "3"), stmt()); + + assertNull(stmtCache.get(key2)); + } + + /** + * + */ + private static PreparedStatement stmt() { + return new PreparedStatementExImpl(null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java new file mode 100644 index 0000000..22bff3b --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/PreparedStatementExSelfTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.sql.PreparedStatement; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class PreparedStatementExSelfTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testStoringMeta() throws Exception { + PreparedStatement stmt = stmt(); + + PreparedStatementEx wrapped = stmt.unwrap(PreparedStatementEx.class); + + wrapped.putMeta(0, "0"); + + assertEquals("0", wrapped.meta(0)); + } + + /** + * @throws Exception If failed. + */ + public void testStoringMoreMetaKeepsExisting() throws Exception { + PreparedStatement stmt = stmt(); + + PreparedStatementEx wrapped = stmt.unwrap(PreparedStatementEx.class); + + wrapped.putMeta(0, "0"); + wrapped.putMeta(1, "1"); + + assertEquals("0", wrapped.meta(0)); + assertEquals("1", wrapped.meta(1)); + } + + /** + * + */ + private static PreparedStatement stmt() { + return new PreparedStatementExImpl(null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java new file mode 100644 index 0000000..b7b7a37 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool.Reusable; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class ThreadLocalObjectPoolSelfTest extends GridCommonAbstractTest { + /** */ + private ThreadLocalObjectPool<Obj> pool = new ThreadLocalObjectPool<>(Obj::new, 1); + + /** + * @throws Exception If failed. + */ + public void testObjectIsReusedAfterRecycling() throws Exception { + Reusable<Obj> o1 = pool.borrow(); + o1.recycle(); + Reusable<Obj> o2 = pool.borrow(); + + assertSame(o1.object(), o2.object()); + assertFalse(o1.object().isClosed()); + } + + /** + * @throws Exception If failed. + */ + public void testBorrowedObjectIsNotReturnedTwice() throws Exception { + Reusable<Obj> o1 = pool.borrow(); + Reusable<Obj> o2 = pool.borrow(); + + assertNotSame(o1.object(), o2.object()); + } + + /** + * @throws Exception If failed. + */ + public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception { + Reusable<Obj> o1 = pool.borrow(); + Reusable<Obj> o2 = pool.borrow(); + o1.recycle(); + o2.recycle(); + + assertTrue(o2.object().isClosed()); + } + + /** + * @throws Exception If failed. + */ + public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception { + Reusable<Obj> o1 = pool.borrow(); + Reusable<Obj> o2 = pool.borrow(); + + o1.recycle(); + + assertEquals(1, pool.bagSize()); + + o2.recycle(); + + assertEquals(1, pool.bagSize()); + } + + /** + * @throws Exception If failed. + */ + public void testObjectShouldReturnedToRecyclingThreadBag() throws Exception { + Reusable<Obj> o1 = pool.borrow(); + + CompletableFuture.runAsync(() -> { + o1.recycle(); + + assertEquals(1, pool.bagSize()); + }).join(); + + assertEquals(0, pool.bagSize()); + } + + /** */ + private static class Obj implements AutoCloseable { + /** */ + private boolean closed = false; + + /** {@inheritDoc} */ + @Override public void close() { + closed = true; + } + + /** + * @return {@code True} if closed. + */ + public boolean isClosed() { + return closed; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index a362586..de77150 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@ -205,6 +205,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { checkQuery("select * from Person"); checkQuery("select distinct * from Person"); checkQuery("select p.name, date from Person p"); + checkQuery("select p.name, date from Person p for update"); checkQuery("select * from Person p, sch2.Address a"); checkQuery("select * from Person, sch2.Address"); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java new file mode 100644 index 0000000..a8087da --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBulkLoadTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccDmlSimpleTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccIteratorWithConcurrentJdbcTransactionTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedBackupsTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedSelectForUpdateQueryTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedSqlCoordinatorFailoverTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedSqlQueriesTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedSqlTxQueriesTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccPartitionedSqlTxQueriesWithReducerTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedBackupsTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSelectForUpdateQueryTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlCoordinatorFailoverTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlQueriesTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesWithReducerTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithConcurrentJdbcTransactionTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; + +/** + * + */ +public class IgniteCacheMvccSqlTestSuite extends TestSuite { + /** + * @return Test suite. + */ + public static TestSuite suite() { + TestSuite suite = new TestSuite("IgniteCache SQL MVCC Test Suite"); + + suite.addTestSuite(CacheMvccSizeWithConcurrentJdbcTransactionTest.class); + suite.addTestSuite(CacheMvccScanQueryWithConcurrentJdbcTransactionTest.class); + suite.addTestSuite(CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.class); + suite.addTestSuite(CacheMvccIteratorWithConcurrentJdbcTransactionTest.class); + suite.addTestSuite(CacheMvccPartitionedSqlQueriesTest.class); + suite.addTestSuite(CacheMvccReplicatedSqlQueriesTest.class); + suite.addTestSuite(CacheMvccPartitionedSqlTxQueriesTest.class); + suite.addTestSuite(CacheMvccReplicatedSqlTxQueriesTest.class); + suite.addTestSuite(CacheMvccPartitionedSqlTxQueriesWithReducerTest.class); + suite.addTestSuite(CacheMvccReplicatedSqlTxQueriesWithReducerTest.class); + suite.addTestSuite(CacheMvccPartitionedSelectForUpdateQueryTest.class); + suite.addTestSuite(CacheMvccReplicatedSelectForUpdateQueryTest.class); + suite.addTestSuite(CacheMvccPartitionedBackupsTest.class); + suite.addTestSuite(CacheMvccReplicatedBackupsTest.class); + suite.addTestSuite(CacheMvccPartitionedSqlCoordinatorFailoverTest.class); + suite.addTestSuite(CacheMvccReplicatedSqlCoordinatorFailoverTest.class); + suite.addTestSuite(CacheMvccBulkLoadTest.class); + suite.addTestSuite(CacheMvccStreamingInsertTest.class); + suite.addTestSuite(CacheMvccDmlSimpleTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 2a61ce6..b0670e8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryDetailMet import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest; import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest; import org.apache.ignite.internal.processors.cache.CacheSqlQueryValueCopySelfTest; +import org.apache.ignite.internal.processors.cache.DdlTransactionSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheCrossCacheQuerySelfTest; import org.apache.ignite.internal.processors.cache.GridCacheFullTextQuerySelfTest; import org.apache.ignite.internal.processors.cache.GridCacheLazyQueryPartitionsReleaseTest; @@ -133,6 +134,8 @@ import org.apache.ignite.internal.processors.cache.index.H2RowCacheSelfTest; import org.apache.ignite.internal.processors.cache.index.LongIndexNameTest; import org.apache.ignite.internal.processors.cache.index.OptimizedMarshallerIndexNameTest; import org.apache.ignite.internal.processors.cache.index.SchemaExchangeSelfTest; +import org.apache.ignite.internal.processors.cache.index.SqlTransactionsComandsWithMvccDisabledSelfTest; +import org.apache.ignite.internal.processors.cache.index.SqlTransactionsSelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQueryCancelOrTimeoutSelfTest; @@ -167,9 +170,13 @@ import org.apache.ignite.internal.processors.query.SqlPushDownFunctionTest; import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest; +import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildSelfTest; +import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest; import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest; import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest; import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest; +import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPoolSelfTest; +import org.apache.ignite.internal.processors.query.h2.PreparedStatementExSelfTest; import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest; import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest; import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest; @@ -184,6 +191,7 @@ import org.apache.ignite.internal.sql.SqlParserBulkLoadSelfTest; import org.apache.ignite.internal.sql.SqlParserCreateIndexSelfTest; import org.apache.ignite.internal.sql.SqlParserDropIndexSelfTest; import org.apache.ignite.internal.sql.SqlParserSetStreamingSelfTest; +import org.apache.ignite.internal.sql.SqlParserTransactionalKeywordsSelfTest; import org.apache.ignite.internal.sql.SqlParserUserSelfTest; import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest; import org.apache.ignite.sqltests.PartitionedSqlTest; @@ -206,6 +214,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(SqlParserCreateIndexSelfTest.class); suite.addTestSuite(SqlParserDropIndexSelfTest.class); + suite.addTestSuite(SqlParserTransactionalKeywordsSelfTest.class); suite.addTestSuite(SqlParserBulkLoadSelfTest.class); suite.addTestSuite(SqlParserSetStreamingSelfTest.class); @@ -354,6 +363,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { //suite.addTestSuite(H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.class); suite.addTestSuite(H2DynamicIndexingComplexServerTransactionalReplicatedTest.class); + suite.addTestSuite(DdlTransactionSelfTest.class); + // Fields queries. suite.addTestSuite(SqlFieldsQuerySelfTest.class); suite.addTestSuite(IgniteCacheLocalFieldsQuerySelfTest.class); @@ -434,6 +445,11 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(OptimizedMarshallerIndexNameTest.class); suite.addTestSuite(SqlSystemViewsSelfTest.class); + suite.addTestSuite(GridIndexRebuildSelfTest.class); + + suite.addTestSuite(SqlTransactionsSelfTest.class); + suite.addTestSuite(SqlTransactionsComandsWithMvccDisabledSelfTest.class); + suite.addTestSuite(IgniteSqlDefaultValueTest.class); suite.addTestSuite(IgniteDecimalSelfTest.class); suite.addTestSuite(IgniteSQLColumnConstraintsTest.class); @@ -451,6 +467,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(SqlParserUserSelfTest.class); suite.addTestSuite(SqlUserCommandSelfTest.class); + suite.addTestSuite(ThreadLocalObjectPoolSelfTest.class); + suite.addTestSuite(H2StatementCacheSelfTest.class); + suite.addTestSuite(PreparedStatementExSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person.csv ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person.csv b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person.csv new file mode 100644 index 0000000..ef7a087 --- /dev/null +++ b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person.csv @@ -0,0 +1,2 @@ +1,John +2,Jack http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person_broken.csv ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person_broken.csv b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person_broken.csv new file mode 100644 index 0000000..b5c2b3f --- /dev/null +++ b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/cache/mvcc/mvcc_person_broken.csv @@ -0,0 +1,2 @@ +1,John +2 http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index 8dcdd57..88c86b2 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -167,6 +167,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter { tCfg.getDefaultTxIsolation(), tCfg.getDefaultTxTimeout(), /*store enabled*/true, + /*sql*/false, /*tx size*/0, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/platforms/cpp/odbc-test/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/Makefile.am b/modules/platforms/cpp/odbc-test/Makefile.am index f412403..14dae7c 100644 --- a/modules/platforms/cpp/odbc-test/Makefile.am +++ b/modules/platforms/cpp/odbc-test/Makefile.am @@ -85,6 +85,7 @@ ignite_odbc_tests_SOURCES = \ src/errors_test.cpp \ src/odbc_test_suite.cpp \ src/types_test.cpp \ + src/transaction_test.cpp \ src/authentication_test.cpp \ ../odbc/src/log.cpp \ ../odbc/src/cursor.cpp \ @@ -101,7 +102,8 @@ ignite_odbc_tests_SOURCES = \ ../odbc/src/column.cpp \ ../odbc/src/common_types.cpp \ ../odbc/src/utility.cpp \ - ../odbc/src/result_page.cpp + ../odbc/src/result_page.cpp \ + ../odbc/src/nested_tx_mode.cpp run-check: check ./ignite-odbc-tests -p http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/platforms/cpp/odbc-test/config/queries-transaction-32.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/config/queries-transaction-32.xml b/modules/platforms/cpp/odbc-test/config/queries-transaction-32.xml new file mode 100644 index 0000000..94ddd3d --- /dev/null +++ b/modules/platforms/cpp/odbc-test/config/queries-transaction-32.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <import resource="queries-default.xml"/> + + <bean parent="queries.cfg"> + <property name="mvccEnabled" value="true"/> + + <property name="memoryConfiguration"> + <bean class="org.apache.ignite.configuration.MemoryConfiguration"> + <property name="systemCacheInitialSize" value="#{10 * 1024 * 1024}"/> + <property name="systemCacheMaxSize" value="#{40 * 1024 * 1024}"/> + <property name="defaultMemoryPolicyName" value="dfltPlc"/> + + <property name="memoryPolicies"> + <list> + <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration"> + <property name="name" value="dfltPlc"/> + <property name="maxSize" value="#{100 * 1024 * 1024}"/> + <property name="initialSize" value="#{10 * 1024 * 1024}"/> + </bean> + </list> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/platforms/cpp/odbc-test/config/queries-transaction.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/config/queries-transaction.xml b/modules/platforms/cpp/odbc-test/config/queries-transaction.xml new file mode 100644 index 0000000..7d74fc9 --- /dev/null +++ b/modules/platforms/cpp/odbc-test/config/queries-transaction.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <import resource="queries-default.xml"/> + + <bean parent="queries.cfg"> + <property name="mvccEnabled" value="true"/> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj index 87b6559..3410ec5 100644 --- a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj +++ b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj @@ -164,6 +164,7 @@ <ClCompile Include="..\..\..\odbc\src\diagnostic\diagnostic_record.cpp" /> <ClCompile Include="..\..\..\odbc\src\diagnostic\diagnostic_record_storage.cpp" /> <ClCompile Include="..\..\..\odbc\src\log.cpp" /> + <ClCompile Include="..\..\..\odbc\src\nested_tx_mode.cpp" /> <ClCompile Include="..\..\..\odbc\src\protocol_version.cpp" /> <ClCompile Include="..\..\..\odbc\src\result_page.cpp" /> <ClCompile Include="..\..\..\odbc\src\row.cpp" /> @@ -200,6 +201,7 @@ <ClCompile Include="..\..\src\teamcity\teamcity_boost.cpp" /> <ClCompile Include="..\..\src\teamcity\teamcity_messages.cpp" /> <ClCompile Include="..\..\src\test_utils.cpp" /> + <ClCompile Include="..\..\src\transaction_test.cpp" /> <ClCompile Include="..\..\src\types_test.cpp" /> <ClCompile Include="..\..\src\utility_test.cpp" /> </ItemGroup> @@ -233,6 +235,8 @@ <None Include="..\..\config\queries-ssl.xml" /> <None Include="..\..\config\queries-test-32.xml" /> <None Include="..\..\config\queries-test.xml" /> + <None Include="..\..\config\queries-transaction-32.xml" /> + <None Include="..\..\config\queries-transaction.xml" /> </ItemGroup> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters index 3d2fcc6..3065df0 100644 --- a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters +++ b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj.filters @@ -145,6 +145,9 @@ <ClCompile Include="..\..\src\queries_ssl_test.cpp"> <Filter>Code</Filter> </ClCompile> + <ClCompile Include="..\..\src\transaction_test.cpp"> + <Filter>Code</Filter> + </ClCompile> <ClCompile Include="..\..\..\odbc\src\config\connection_string_parser.cpp"> <Filter>Externals</Filter> </ClCompile> @@ -166,6 +169,9 @@ <ClCompile Include="..\..\src\authentication_test.cpp"> <Filter>Code</Filter> </ClCompile> + <ClCompile Include="..\..\..\odbc\src\nested_tx_mode.cpp"> + <Filter>Externals</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\test_type.h"> @@ -209,5 +215,11 @@ <None Include="..\..\config\queries-auth-32.xml"> <Filter>Configs</Filter> </None> + <None Include="..\..\config\queries-transaction.xml"> + <Filter>Configs</Filter> + </None> + <None Include="..\..\config\queries-transaction-32.xml"> + <Filter>Configs</Filter> + </None> </ItemGroup> </Project> \ No newline at end of file