http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java new file mode 100644 index 0000000..572c897 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.junit.Assert.assertTrue; + + +/** + * Created by russo on 6/9/16. + */ +public class ShardIteratorTest extends AbstractTest { + + private static final Logger logger = LoggerFactory.getLogger( ShardIteratorTest.class ); + + @Test + public void getActiveShards(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null); + + shardSerialization.createShard(shard1); + shardSerialization.createShard(shard2); + + Iterator<Shard> shardIterator = new ShardIterator( + cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.empty()); + + List<Shard> shards = new ArrayList<>(1); + + + shardIterator.forEachRemaining(shard -> { + + logger.info("Shard ID: {}", shard.getShardId()); + shards.add(shard); + + }); + + assertTrue(shards.size() == 2 && shards.get(0).equals(shard1) && shards.get(1).equals(shard2)); + + + } + + @Test + public void seekActiveShards(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null); + Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null); + + shardSerialization.createShard(shard1); + shardSerialization.createShard(shard2); + shardSerialization.createShard(shard3); + + + Iterator<Shard> shardIterator = new ShardIterator( + cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.of(200L)); + + List<Shard> shards = new ArrayList<>(1); + + shardIterator.forEachRemaining(shard -> { + + logger.info("Shard ID: {}", shard.getShardId()); + shards.add(shard); + + }); + + assertTrue(shards.size() == 1 && shards.get(0).equals(shard3)); + } + + + @Test + public void shardIteratorOrdering() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + int numShards = 10; + String region = "default"; + String queueName = "sit_queue_" + RandomStringUtils.randomAlphanumeric(20); + + for ( long i=0; i<numShards; i++) { + UUID messageId = QakkaUtils.getTimeUuid(); + Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, i+1, messageId ); + shardSerialization.createShard( shard ); + try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {} + } + + Iterator<Shard> shardIterator = new ShardIterator( + cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty()); + + int count = 0; + Long prevTimestamp = null; + while ( shardIterator.hasNext() ) { + Shard shard = shardIterator.next(); + if ( prevTimestamp != null ) { + Assert.assertTrue( prevTimestamp < shard.getPointer().timestamp() ); + } + prevTimestamp = shard.getPointer().timestamp(); + count++; + } + + Assert.assertEquals( numShards, count ); + } +}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java new file mode 100644 index 0000000..e1a541b --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * Created by russo on 6/8/16. + */ +public class ShardSerializationTest extends AbstractTest { + + private static final Logger logger = LoggerFactory.getLogger( ShardSerializationTest.class ); + + + @Test + public void writeNewShard(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + shardSerialization.createShard(shard1); + } + + @Test + public void deleteShard(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + shardSerialization.createShard(shard1); + shardSerialization.deleteShard(shard1); + assertNull(shardSerialization.loadShard(shard1)); + + + + } + + @Test + public void loadNullShard(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null); + + assertNull(shardSerialization.loadShard(shard1)); + + + + } + + @Test + public void updatePointer(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + shardSerialization.createShard(shard1); + + final UUID pointer = QakkaUtils.getTimeUuid(); + + shard1.setPointer(pointer); + shardSerialization.updateShardPointer(shard1); + + Shard returnedShard = shardSerialization.loadShard(shard1); + + assertEquals(pointer, returnedShard.getPointer()); + + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java new file mode 100644 index 0000000..ea73abc --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + + +public class ShardStrategyTest extends AbstractTest { + + private static final Logger logger = LoggerFactory.getLogger( ShardStrategyTest.class ); + + + @Test + public void testBasicOperation() { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + ShardSerialization shardSer = getInjector().getInstance( ShardSerialization.class ); + ShardStrategy shardStrategy = getInjector().getInstance( ShardStrategy.class ); + + UUID messageIdToLocate = null; + long selectedShardId = 4L; + + int numShards = 10; + String region = "default"; + String queueName = "sst_queue_" + RandomStringUtils.randomAlphanumeric(20); + + for ( long i=0; i<numShards; i++) { + shardSer.createShard( new Shard( queueName, region, Shard.Type.DEFAULT, i, QakkaUtils.getTimeUuid())); + try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {} + if ( i == selectedShardId ) { + messageIdToLocate = QakkaUtils.getTimeUuid(); + } + try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {} + } + + Shard selectedShard = shardStrategy.selectShard( queueName, region, Shard.Type.DEFAULT, messageIdToLocate ); + + Assert.assertEquals( selectedShardId, selectedShard.getShardId() ); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java new file mode 100644 index 0000000..fba135a --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Created by russo on 6/9/16. + */ +public class ShardTest extends AbstractTest { + + + @Test + public void testEquals(){ + + Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 100L, null); + Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 100L, null); + + assertEquals(shard1, shard2); + + } + + @Test + public void testHashCode(){ + + Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 10000000000L, null); + Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 10000000000L, null); + + assertEquals(shard1.hashCode(), shard2.hashCode()); + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java new file mode 100644 index 0000000..20b72b0 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.transferlog; + +import com.datastax.driver.core.PagingState; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaException; +import org.apache.usergrid.persistence.qakka.serialization.Result; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + + +public class TransferLogSerializationTest extends AbstractTest { + + @Test + public void recordTransferLog() throws Exception { + + TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class ); + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); + String source = RandomStringUtils.randomAlphanumeric( 15 ); + String dest = RandomStringUtils.randomAlphanumeric( 15 ); + + int numLogs = 100; + + for ( int i=0; i<numLogs; i++ ) { + logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID()); + } + + int count = 0; + int fetchCount = 0; + PagingState pagingState = null; + while ( true ) { + + Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 ); + + // we only want entities for our queue + List<TransferLog> logs = all.getEntities().stream() + .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); + + count += logs.size(); + fetchCount++; + if ( all.getPagingState() == null ) { + break; + } + pagingState = all.getPagingState(); + } + + Assert.assertEquals( numLogs, count ); + } + + @Test + public void removeTransferLog() throws Exception { + + TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class ); + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); + String source = RandomStringUtils.randomAlphanumeric( 15 ); + String dest = RandomStringUtils.randomAlphanumeric( 15 ); + + UUID messageId = UUIDGen.getTimeUUID(); + logSerialization.recordTransferLog( queueName, source, dest, messageId ); + + List<TransferLog> allLogs = getTransferLogs( logSerialization ); + + // we only want entities for our queue + List<TransferLog> logs = allLogs.stream() + .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); + Assert.assertEquals( 1, logs.size()); + + logSerialization.removeTransferLog( queueName, source, dest, messageId ); + + List<TransferLog> all = getTransferLogs( logSerialization ); + logs = all.stream() + .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); + Assert.assertEquals( 0, logs.size()); + + try { + logSerialization.removeTransferLog( queueName, source, dest, messageId ); + Assert.fail("Removing non-existent log should throw exception"); + + } catch ( QakkaException expected ) { + // success! + } + } + + private List<TransferLog> getTransferLogs(TransferLogSerialization logSerialization) { + PagingState pagingState = null; + List<TransferLog> allLogs = new ArrayList<>(); + while ( true ) { + Result<TransferLog> result = logSerialization.getAllTransferLogs( pagingState, 100 ); + allLogs.addAll( result.getEntities() ); + if ( result.getPagingState() == null ) { + break; + } + pagingState = result.getPagingState(); + } + return allLogs; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java index 69655e5..4b6e9d3 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java @@ -20,72 +20,67 @@ package org.apache.usergrid.persistence.queue; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.queue.guice.QueueModule; +import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; +import org.junit.Ignore; +import org.junit.Test; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.core.test.ITRunner; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.queue.guice.TestQueueModule; -import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; - -import com.google.inject.Inject; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - -@RunWith( ITRunner.class ) -@UseModules( { TestQueueModule.class } ) -public class LegacyQueueManagerTest { - @Inject - protected LegacyQueueFig queueFig; - @Inject - protected LegacyQueueManagerFactory qmf; +public class LegacyQueueManagerTest extends AbstractTest { - /** - * Mark tests as ignored if no AWS creds are present - */ - @Rule - public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule(); + public static long queueSeed = System.currentTimeMillis(); + // give each test its own injector + @Override + protected Injector getInjector() { + return Guice.createInjector( new QueueModule() ); + } - protected LegacyQueueScope scope; - private LegacyQueueManager qm; - public static long queueSeed = System.currentTimeMillis(); + @Test + public void send() throws Exception{ + Injector myInjector = getInjector(); - @Before - public void mockApp() { + CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); - this.scope = new LegacyQueueScopeImpl( "testQueue"+queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL); - qm = qmf.getQueueManager(scope); - } + ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); - @org.junit.After - public void cleanup(){ - qm.deleteQueue(); - } + App app = myInjector.getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + final LegacyQueueScopeImpl scope = + new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL ); + LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class ); + LegacyQueueManager qm = qmf.getQueueManager(scope); - @Test - public void send() throws Exception{ String value = "bodytest"; qm.sendMessage(value); + + Thread.sleep(5000); + List<LegacyQueueMessage> messageList = qm.getMessages(1, String.class); assertTrue(messageList.size() >= 1); for(LegacyQueueMessage message : messageList){ - assertTrue(message.getBody().equals(value)); + assertEquals( value, message.getBody() ); qm.commitMessage(message); } @@ -96,12 +91,32 @@ public class LegacyQueueManagerTest { @Test public void sendMore() throws Exception{ + + Injector myInjector = getInjector(); + + CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); + + App app = myInjector.getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + final LegacyQueueScopeImpl scope = + new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL ); + LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class ); + LegacyQueueManager qm = qmf.getQueueManager(scope); + HashMap<String,String> values = new HashMap<>(); values.put("test","Test"); List<Map<String,String>> bodies = new ArrayList<>(); bodies.add(values); qm.sendMessages(bodies); + + Thread.sleep(5000); + List<LegacyQueueMessage> messageList = qm.getMessages(1, values.getClass()); assertTrue(messageList.size() >= 1); for(LegacyQueueMessage message : messageList){ @@ -115,7 +130,25 @@ public class LegacyQueueManagerTest { } @Test + @Ignore("Not implemented yet") public void queueSize() throws Exception{ + + Injector myInjector = getInjector(); + + CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); + + App app = myInjector.getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + final LegacyQueueScopeImpl scope = + new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL ); + LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class ); + LegacyQueueManager qm = qmf.getQueueManager(scope); + HashMap<String,String> values = new HashMap<>(); values.put("test", "Test"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java index 8390672..70e3543 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java @@ -18,7 +18,6 @@ package org.apache.usergrid.persistence.queue.guice; -import org.apache.usergrid.persistence.core.guice.CommonModule; import org.apache.usergrid.persistence.core.guice.TestModule; @@ -26,7 +25,6 @@ public class TestQueueModule extends TestModule { @Override protected void configure() { - install( new CommonModule()); install( new QueueModule() ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/cassandra.yaml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/cassandra.yaml b/stack/corepersistence/queue/src/test/resources/cassandra.yaml new file mode 100644 index 0000000..e97bf00 --- /dev/null +++ b/stack/corepersistence/queue/src/test/resources/cassandra.yaml @@ -0,0 +1,53 @@ + +cluster_name: 'Embedded Test Cluster' + +# ports +storage_port: 7075 +listen_address: localhost +rpc_address: localhost +rpc_port: 9160 +native_transport_port: 9042 + +# data files +data_file_directories: + - target/embeddedCassandra/data +commitlog_directory: target/embeddedCassandra/commitlog +saved_caches_directory: target/embeddedCassandra/saved_caches + +# native transport! +start_native_transport: true + +# other stuff +start_rpc: true +initial_token: +auto_bootstrap: false +hinted_handoff_enabled: true +authenticator: org.apache.cassandra.auth.AllowAllAuthenticator +partitioner: org.apache.cassandra.dht.RandomPartitioner +seed_provider: + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + - seeds: "127.0.0.1" +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 +disk_access_mode: auto +concurrent_reads: 2 +concurrent_writes: 4 +rpc_keepalive: true +thrift_framed_transport_size_in_mb: 15 +thrift_max_message_length_in_mb: 16 +snapshot_before_compaction: false +column_index_size_in_kb: 64 +endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch +dynamic_snitch: true +dynamic_snitch_update_interval_in_ms: 100 +dynamic_snitch_reset_interval_in_ms: 600000 +dynamic_snitch_badness_threshold: 0.0 +request_scheduler: org.apache.cassandra.scheduler.NoScheduler +server_encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra +index_interval: 128 http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties new file mode 100644 index 0000000..3c679f5 --- /dev/null +++ b/stack/corepersistence/queue/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger=ERROR,stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n + +log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG +log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG +log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG + +log4j.logger.org.apache.cassandra=WARN +log4j.logger.org.glassfish=WARN http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg b/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg new file mode 100644 index 0000000..8a0e0a2 Binary files /dev/null and b/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg differ http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties new file mode 100644 index 0000000..c3b613c --- /dev/null +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# Properties for JUnit tests + +usergrid.cluster_name=Test Cluster + +usergrid.cluster.hostname=localhost + +# Comma-separated list of regions to be considered +usergrid.cluster.region.list=us-east + +# The regions of this local instance of Usergrid +usergrid.cluster.region.local=us-east + +# Comma-separated lists of cluster seeds each with format {region}:{hostname} +usergrid.cluster.seeds=us-east:localhost + +# Port used for cluster communications. +usergrid.cluster.port=2551 + +queue.writer.num.actors=100 + +# set shard size and times low for testing purposes +queue.shard.max.size=500 +queue.shard.allocation.check.frequency.millis=100 +queue.shard.allocation.advance.time.millis=200 + +queue.max.inmemory.shard.counter = 100 + +cassandra.hosts=localhost + +cassandra.keyspace.application=qakka_test + +cassandra.keyspace-drop-and-create=true http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/pom.xml ---------------------------------------------------------------------- diff --git a/stack/pom.xml b/stack/pom.xml index 5186a13..d739bb4 100644 --- a/stack/pom.xml +++ b/stack/pom.xml @@ -100,9 +100,9 @@ <amber-version>0.22-incubating</amber-version> <astyanax.version>3.9.0</astyanax.version> <aws.version>1.10.20</aws.version> - <cassandra-version>1.2.18</cassandra-version> + <cassandra-version>2.1.14</cassandra-version> <guava.version>18.0</guava.version> - <guice.version>4.0-beta5</guice.version> + <guice.version>4.0</guice.version> <hector-om-version>3.0-03</hector-om-version> <hector-version>1.1-4</hector-version> <hector-test-version>1.1-4</hector-test-version> @@ -110,7 +110,7 @@ <jackson-version>1.9.9</jackson-version> <jackson-2-version>2.3.3</jackson-2-version> <jclouds.version>1.9.0</jclouds.version> - <jersey-version>2.21</jersey-version> + <jersey-version>2.23.1</jersey-version> <junit-version>4.12</junit-version> <log4j-version>1.2.16</log4j-version> <org.springframework.version>3.2.13.RELEASE</org.springframework.version> http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java index 3d4911d..8356f16 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java @@ -16,24 +16,26 @@ */ package org.apache.usergrid.services.notifications.apns; -import com.relayrides.pushy.apns.*; -import com.relayrides.pushy.apns.util.*; - +import com.fasterxml.jackson.databind.ObjectMapper; +import com.relayrides.pushy.apns.PushManager; +import com.relayrides.pushy.apns.PushManagerConfiguration; +import com.relayrides.pushy.apns.util.SimpleApnsPushNotification; +import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.entities.Notification; import org.apache.usergrid.persistence.entities.Notifier; -import org.mortbay.util.ajax.JSON; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.*; - -import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.services.ServicePayload; import org.apache.usergrid.services.notifications.ConnectionException; import org.apache.usergrid.services.notifications.ProviderAdapter; import org.apache.usergrid.services.notifications.TaskTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; /** * Adapter for Apple push notifications @@ -47,6 +49,8 @@ public class APNsAdapter implements ProviderAdapter { private static final String TEST_TOKEN = "ff026b5a4d2761ef13843e8bcab9fc83b47f1dfbd1d977d225ab296153ce06d6"; private static final String TEST_PAYLOAD = "{}"; + private static ObjectMapper objectMapper = new ObjectMapper(); + static { validEnvironments.add("development"); validEnvironments.add("production"); @@ -155,7 +159,7 @@ public class APNsAdapter implements ProviderAdapter { payload = "{\"aps\":{\"alert\":\"" + payload + "\"}}"; } } else { - payload = JSON.toString(objPayload); + payload = objectMapper.writeValueAsString( objPayload ); } if (payload.length() > 2048) { throw new IllegalArgumentException( http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java index 7929ad4..6b619b7 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java @@ -16,11 +16,12 @@ */ package org.apache.usergrid.services.notifications.gcm; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.android.gcm.server.*; import org.apache.usergrid.persistence.entities.Notification; import org.apache.usergrid.persistence.entities.Notifier; import org.apache.usergrid.services.notifications.InactiveDeviceManager; -import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,8 @@ public class GCMAdapter implements ProviderAdapter { private final Notifier notifier; private EntityManager entityManager; + private static ObjectMapper objectMapper = new ObjectMapper(); + private ConcurrentHashMap<Long,Batch> batches; private static final String ttlKey = "time_to_live"; @@ -147,9 +150,9 @@ public class GCMAdapter implements ProviderAdapter { throw new IllegalArgumentException( "GCM Payload must be either a Map or a String"); } - if (JSON.toString(mapPayload).length() > 4096) { - throw new IllegalArgumentException( - "GCM payloads must be 4096 characters or less"); + String payloadString = objectMapper.writeValueAsString( mapPayload ); + if ( payloadString.length() > 4096) { + throw new IllegalArgumentException( "GCM payloads must be 4096 characters or less"); } return mapPayload; }