Adding concurrent tests for shard and message counters.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f56e1b0d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f56e1b0d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f56e1b0d Branch: refs/heads/usergrid-1318-queue Commit: f56e1b0d18cc8bb224dcede58d8171df8f0c0858 Parents: 2afaa92 Author: Dave Johnson <snoopd...@apache.org> Authored: Wed Oct 12 09:38:05 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Wed Oct 12 09:38:05 2016 -0400 ---------------------------------------------------------------------- .../impl/MessageCounterSerializationTest.java | 50 +++++++++++++++++++ .../sharding/ShardCounterSerializationTest.java | 51 ++++++++++++++++++++ 2 files changed, 101 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java index a4ea0f1..1f18db7 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java @@ -28,6 +28,11 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message import org.junit.Assert; import org.junit.Test; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.fail; @@ -87,4 +92,49 @@ public class MessageCounterSerializationTest extends AbstractTest { Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); } + + @Test + public void testConcurrentOperation() { + + // create multiple threads, each will increment and decrement counter by same number + + Injector injector = getInjector(); + MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class ); + String queueName = "mtco_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + + int poolSize = 20; + int numThreads = 20; + int numCounts = 3000; + ExecutorService execService = Executors.newFixedThreadPool( poolSize ); + + for (int i = 0; i < numThreads; i++) { + + execService.submit( () -> { + + for ( int j = 0; j < numCounts; j++ ) { + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 ); + } + + for ( int k = 0; k < numCounts; k++ ) { + mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 ); + } + }); + } + + execService.shutdown(); + + try { + while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) { + System.out.println( "Waiting... " + + mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // at end counter should be zero + + Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java index 8dc16bb..bc80943 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java @@ -19,14 +19,21 @@ package org.apache.usergrid.persistence.qakka.serialization.sharding; +import com.google.inject.Injector; import org.apache.commons.lang.RandomStringUtils; import org.apache.usergrid.persistence.qakka.core.CassandraClient; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.fail; @@ -58,4 +65,48 @@ public class ShardCounterSerializationTest extends AbstractTest { Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); } + + @Test + public void testConcurrentOperation() { + + // create multiple threads, each will increment counter by some number + + Injector injector = getInjector(); + ShardCounterSerialization scs = injector.getInstance( ShardCounterSerialization.class ); + String queueName = "stco_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + long shardId = 100L; + + int poolSize = 20; + int numThreads = 20; + int numCounts = 3000; + ExecutorService execService = Executors.newFixedThreadPool( poolSize ); + + for (int i = 0; i < numThreads; i++) { + + execService.submit( () -> { + + for ( int j = 0; j < numCounts; j++ ) { + scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 1 ); + } + + }); + } + + execService.shutdown(); + + try { + while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) { + System.out.println( "Waiting... " + + scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // test that counter is correct value + + Assert.assertEquals( numThreads * numCounts, + scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); + } + }