IGNITE-9659 Fixed testNonCollocatedRetryMessage flaky test - Fixes #5005. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f287428 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f287428 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f287428 Branch: refs/heads/ignite-9720 Commit: 6f2874285a9eb1132d589223a5aa2c8fa8891fb8 Parents: f156631 Author: NSAmelchev <nsamelc...@gmail.com> Authored: Mon Oct 22 13:29:44 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Oct 22 13:29:44 2018 +0300 ---------------------------------------------------------------------- .../NonCollocatedRetryMessageSelfTest.java | 119 ++++++++++--------- 1 file changed, 62 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6f287428/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java index c602225..e8f6624 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java @@ -22,19 +22,18 @@ import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; @@ -44,23 +43,31 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; */ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest { /** */ - private static final int NODES_COUNT = 3; + private static final int NODES_COUNT = 2; /** */ private static final String ORG = "org"; /** */ + private static final int TEST_SQL_RETRY_TIMEOUT = 500; + + /** */ + private String sqlRetryTimeoutBackup; + + /** */ private IgniteCache<String, JoinSqlTestHelper.Person> personCache; /** */ public void testNonCollocatedRetryMessage() { - SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0"); + SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>( + JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0"); qry.setDistributedJoins(true); try { - List<Cache.Entry<String,JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll(); - fail("No CacheException emitted. Collection size="+prsns.size()); + List<Cache.Entry<String, JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll(); + + fail("No CacheException emitted. Collection size=" + prsns.size()); } catch (CacheException e) { assertTrue(e.getMessage(), e.getMessage().contains("Failed to execute non-collocated query")); @@ -68,66 +75,35 @@ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCommunicationSpi(new TcpCommunicationSpi(){ - volatile long reqId = -1; - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { - assert msg != null; - - if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){ - GridIoMessage gridMsg = (GridIoMessage)msg; - - if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){ - GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message()); - - if (reqId < 0) { - reqId = req.requestId(); - - String shutName = getTestIgniteInstanceName(1); - - stopGrid(shutName, true, false); - } - else if( reqId != req.requestId() ){ - try { - U.sleep(IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT)); - } - catch (IgniteInterruptedCheckedException e) { - // no-op - } - } - } - } - super.sendMessage(node, msg, ackC); - } - }); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setDiscoverySpi(new TcpDiscoverySpi(){ - public long getNodesJoined() { - return stats.joinedNodesCount(); - } - }); + cfg.setCommunicationSpi(new TestTcpCommunication()); return cfg; } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000"); + sqlRetryTimeoutBackup = System.getProperty(IGNITE_SQL_RETRY_TIMEOUT); + + System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, String.valueOf(TEST_SQL_RETRY_TIMEOUT)); startGridsMultiThreaded(NODES_COUNT, false); - personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Person>("pers") - .setBackups(1) - .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class) - ); + CacheConfiguration<String, JoinSqlTestHelper.Person> ccfg1 = new CacheConfiguration<>("pers"); + + ccfg1.setBackups(1); + ccfg1.setIndexedTypes(String.class, JoinSqlTestHelper.Person.class); - final IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Organization>(ORG) - .setBackups(1) - .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class) - ); + personCache = ignite(0).getOrCreateCache(ccfg1); + + CacheConfiguration<String, JoinSqlTestHelper.Organization> ccfg2 = new CacheConfiguration<>(ORG); + + ccfg2.setBackups(1); + ccfg2.setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class); + + IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(ccfg2); awaitPartitionMapExchange(); @@ -137,10 +113,39 @@ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { + @Override protected void afterTest() { + if (sqlRetryTimeoutBackup != null) + System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, sqlRetryTimeoutBackup); + stopAllGrids(); } + /** + * TcpCommunicationSpi with additional features needed for tests. + */ + private class TestTcpCommunication extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + assert msg != null; + + if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) && + GridIoMessage.class.isAssignableFrom(msg.getClass())) { + GridIoMessage gridMsg = (GridIoMessage)msg; + + if (GridH2IndexRangeRequest.class.isAssignableFrom(gridMsg.message().getClass())) { + try { + U.sleep(TEST_SQL_RETRY_TIMEOUT); + } + catch (IgniteInterruptedCheckedException e) { + fail("Test was interrupted."); + } + throw new IgniteSpiException("Test exception."); + } + } + + super.sendMessage(node, msg, ackC); + } + } }