Fixes to get all tests passing again.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/99dbfc2d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/99dbfc2d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/99dbfc2d Branch: refs/heads/usergrid-1318-queue Commit: 99dbfc2d17e330583560d386b6ecb5cce93fa3e5 Parents: 447b60d Author: Dave Johnson <snoopd...@apache.org> Authored: Fri Sep 16 09:39:51 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Fri Sep 16 09:39:51 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/core/CassandraFig.java | 12 ++++++++++ .../core/datastax/impl/DataStaxClusterImpl.java | 2 +- .../persistence/qakka/AbstractTest.java | 2 +- .../qakka/core/QueueMessageManagerTest.java | 2 -- .../distributed/QueueActorServiceTest.java | 1 + .../distributed/actors/QueueTimeouterTest.java | 1 - .../distributed/actors/ShardAllocatorTest.java | 1 - ...tiShardDatabaseQueueMessageIteratorTest.java | 23 +++++++++++--------- .../sharding/ShardIteratorTest.java | 18 +++++++++------ .../queue/src/test/resources/log4j.properties | 3 +++ .../queue/src/test/resources/qakka.properties | 12 +++------- 11 files changed, 45 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java index 90f4ae8..b599a20 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java @@ -35,9 +35,13 @@ public interface CassandraFig extends GuicyFig { String READ_CL = "cassandra.readcl"; String READ_CL_CONSISTENT = "cassandra.readcl.consistent"; String WRITE_CL = "cassandra.writecl"; + String STRATEGY = "cassandra.strategy"; String STRATEGY_OPTIONS = "cassandra.strategy.options"; + String STRATEGY_LOCAL = "cassandra.strategy.local"; + String STRATEGY_OPTIONS_LOCAL = "cassandra.strategy.options.local"; + // main application cassandra properties String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl"; String ASTYANAX_READ_CL = "usergrid.read.cl"; @@ -157,6 +161,14 @@ public interface CassandraFig extends GuicyFig { @Key( STRATEGY_OPTIONS ) String getStrategyOptions(); + @Default("SimpleStrategy") + @Key( STRATEGY_LOCAL ) + String getStrategyLocal(); + + @Default("replication_factor:1") + @Key( STRATEGY_OPTIONS_LOCAL ) + String getStrategyOptionsLocal(); + /** * Return the history of all shard values which are immutable. For instance, if shard values * are initially set to 20 (the default) then increased to 40, the property should contain the string of http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java index fe9803d..c8ddf3e 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java @@ -148,7 +148,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { final String createQueueMessageKeyspace = String.format( "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s", CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()), - CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()) + CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), cassandraFig.getStrategyOptionsLocal()) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java index 6f1c744..c90db2e 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java @@ -40,7 +40,7 @@ public class AbstractTest { protected static Injector sharedInjector; - AtomicBoolean migrated = new AtomicBoolean( false ); + static AtomicBoolean migrated = new AtomicBoolean( false ); static { new KeyspaceDropper(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 630c953..124cb86 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -134,7 +134,6 @@ public class QueueMessageManagerTest extends AbstractTest { @Test - @Ignore public void testQueueMessageTimeouts() throws Exception { Injector injector = getInjector(); @@ -223,7 +222,6 @@ public class QueueMessageManagerTest extends AbstractTest { @Test - @Ignore public void testGetWithMissingData() throws InterruptedException { Injector injector = getInjector(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index a46c186..0883650 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -156,6 +156,7 @@ public class QueueActorServiceTest extends AbstractTest { int count = 0; while ( retries++ < maxRetries ) { Thread.sleep( 1000 ); + distributedQueueService.refresh(); if (inMemoryQueue.size( queueName ) == 100) { count = 100; break; http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java index 54f9d42..e3541a4 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java @@ -54,7 +54,6 @@ public class QueueTimeouterTest extends AbstractTest { @Test - @Ignore public void testBasicOperation() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java index ae62c89..7fd664f 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -163,7 +163,6 @@ public class ShardAllocatorTest extends AbstractTest { @Test - @Ignore public void testBasicOperationWithMessages() throws InterruptedException { Injector injector = getInjector(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java index 2d8da6d..5fa3434 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.qakka.serialization; +import org.apache.commons.lang.RandomStringUtils; import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; @@ -56,10 +57,12 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { QueueMessageSerialization queueMessageSerialization = getInjector().getInstance( QueueMessageSerialization.class ); - Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null); - Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null); - Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null); - Shard shard4 = new Shard("test", "region", Shard.Type.DEFAULT, 4L, null); + String queueName = "queue_msit_" + RandomStringUtils.randomAlphanumeric( 10 ); + + Shard shard1 = new Shard(queueName, "region", Shard.Type.DEFAULT, 1L, null); + Shard shard2 = new Shard(queueName, "region", Shard.Type.DEFAULT, 2L, null); + Shard shard3 = new Shard(queueName, "region", Shard.Type.DEFAULT, 3L, null); + Shard shard4 = new Shard(queueName, "region", Shard.Type.DEFAULT, 4L, null); shardSerialization.createShard(shard1); shardSerialization.createShard(shard2); @@ -72,7 +75,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard1.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -80,7 +83,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard2.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -88,7 +91,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard3.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -96,16 +99,16 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, queueName, "region", shard4.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } ShardIterator shardIterator = new ShardIterator( - cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty()); + cassandraClient, queueName, "region", Shard.Type.DEFAULT, Optional.empty()); MultiShardMessageIterator iterator = new MultiShardMessageIterator( - cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null); + cassandraClient, queueName, "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null); final AtomicInteger[] counts = { new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java index fb0a46e..0d593aa 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java @@ -50,14 +50,16 @@ public class ShardIteratorTest extends AbstractTest { CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); - Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); - Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null); + String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 ); + + Shard shard1 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 100L, null); + Shard shard2 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 200L, null); shardSerialization.createShard(shard1); shardSerialization.createShard(shard2); Iterator<Shard> shardIterator = new ShardIterator( - cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.empty()); + cassandraClient, queueName, "region1", Shard.Type.DEFAULT, Optional.empty()); List<Shard> shards = new ArrayList<>(1); @@ -81,9 +83,11 @@ public class ShardIteratorTest extends AbstractTest { CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); - Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); - Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null); - Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null); + String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 ); + + Shard shard1 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 100L, null); + Shard shard2 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 200L, null); + Shard shard3 = new Shard(queueName, "region1", Shard.Type.DEFAULT, 300L, null); shardSerialization.createShard(shard1); shardSerialization.createShard(shard2); @@ -91,7 +95,7 @@ public class ShardIteratorTest extends AbstractTest { Iterator<Shard> shardIterator = new ShardIterator( - cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.of(200L)); + cassandraClient, queueName, "region1", Shard.Type.DEFAULT, Optional.of(200L)); List<Shard> shards = new ArrayList<>(1); http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties index 3c679f5..9e14f29 100644 --- a/stack/corepersistence/queue/src/test/resources/log4j.properties +++ b/stack/corepersistence/queue/src/test/resources/log4j.properties @@ -27,3 +27,6 @@ log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG log4j.logger.org.apache.cassandra=WARN log4j.logger.org.glassfish=WARN + +log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG + http://git-wip-us.apache.org/repos/asf/usergrid/blob/99dbfc2d/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index 9140637..dc7ef48 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -34,21 +34,15 @@ usergrid.cluster.seeds=us-east:localhost # Port used for cluster communications. usergrid.cluster.port=2551 -queue.sender.num.actors=20 -queue.writer.num.actors=20 -queue.num.actors=20 +queue.writer.num.actors=100 # set shard size and times low for testing purposes queue.shard.max.size=500 -queue.shard.allocation.check.frequency.millis=1000 -queue.shard.allocation.advance.time.millis=2000 -queue.refresh.millis=1000 +queue.shard.allocation.check.frequency.millis=100 +queue.shard.allocation.advance.time.millis=200 queue.max.inmemory.shard.counter = 100 -cassandra.connections=10 -#cassandra.timeout=20000 - cassandra.hosts=localhost cassandra.keyspace.application=qakka_test_application