http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java new file mode 100644 index 0000000..9158412 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.serialization.sharding.impl; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + + +@Singleton +public class ShardCounterSerializationImpl implements ShardCounterSerialization { + private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + final static String TABLE_COUNTERS = "shard_counters"; + final static String COLUMN_QUEUE_NAME = "queue_name"; + final static String COLUMN_SHARD_ID = "shard_id"; + final static String COLUMN_COUNTER_VALUE = "counter_value"; + final static String COLUMN_SHARD_TYPE = "shard_type"; + + static final String CQL = + "CREATE TABLE IF NOT EXISTS shard_counters ( " + + "counter_value counter, " + + "queue_name varchar, " + + "shard_type varchar, " + + "shard_id bigint, " + + "PRIMARY KEY (queue_name, shard_type, shard_id) " + + "); "; + + final long maxInMemoryIncrement; + + class InMemoryCount { + long baseCount; + final AtomicLong increment = new AtomicLong( 0L ); + InMemoryCount( long baseCount ) { + this.baseCount = baseCount; + } + public long value() { + return baseCount + increment.get(); + } + public AtomicLong getIncrement() { + return increment; + } + void setBaseCount( long baseCount ) { + this.baseCount = baseCount; + } + } + + private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200); + + + @Inject + public ShardCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); + this.cassandraClient = cassandraClient; + } + + + @Override + public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) { + + String key = queueName + type + shardId; + synchronized ( inMemoryCounters ) { + + if ( inMemoryCounters.get( key ) == null ) { + + Long value = retrieveCounterFromStorage( queueName, type, shardId ); + + if ( value == null ) { + incrementCounterInStorage( queueName, type, shardId, 0L ); + inMemoryCounters.put( key, new InMemoryCount( 0L )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); + } + inMemoryCounters.get( key ).getIncrement().addAndGet( increment ); + return; + } + } + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { + long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment ); + + if (totalIncrement > maxInMemoryIncrement) { + incrementCounterInStorage( queueName, type, shardId, totalIncrement ); + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) ); + inMemoryCount.getIncrement().set( 0L ); + } + } + + } + + + @Override + public long getCounterValue( String queueName, Shard.Type type, long shardId ) { + + String key = queueName + type + shardId; + + synchronized ( inMemoryCounters ) { + + if ( inMemoryCounters.get( key ) == null ) { + + Long value = retrieveCounterFromStorage( queueName, type, shardId ); + + if ( value == null ) { + throw new NotFoundException( + MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}", + queueName, type, shardId )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); + } + } + } + + return inMemoryCounters.get( key ).value(); + } + + void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) { + + Statement update = QueryBuilder.update( TABLE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) + .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) ); + cassandraClient.getSession().execute( update ); + } + + + Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) { + + Statement query = QueryBuilder.select().from( TABLE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) ) + .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ); + + ResultSet resultSet = cassandraClient.getSession().execute( query ); + List<Row> all = resultSet.all(); + + if ( all.size() > 1 ) { + throw new QakkaRuntimeException( + "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId ); + } + if ( all.isEmpty() ) { + return null; + } + return all.get(0).getLong( COLUMN_COUNTER_VALUE ); + } + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Collections.singletonList( new TableDefinitionStringImpl( "shard_counters", CQL ) ); + } +}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java new file mode 100644 index 0000000..7b9fd8e --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding.impl; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Assignment; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + + +public class ShardSerializationImpl implements ShardSerialization { + + private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + public final static String COLUMN_QUEUE_NAME = "queue_name"; + public final static String COLUMN_REGION = "region"; + public final static String COLUMN_SHARD_ID = "shard_id"; + public final static String COLUMN_ACTIVE = "active"; + public final static String COLUMN_POINTER = "pointer"; + + + public final static String TABLE_SHARDS_MESSAGES_AVAILABLE = "shards_messages_available"; + + public final static String TABLE_SHARDS_MESSAGES_INFLIGHT = "shards_messages_inflight"; + + + static final String SHARDS_MESSAGES_AVAILABLE = + "CREATE TABLE IF NOT EXISTS shards_messages_available ( " + + "queue_name text, " + + "region text, " + + "shard_id bigint, " + + "active int, " + + "pointer timeuuid, " + + "PRIMARY KEY ((queue_name, region), active, shard_id) " + + ") WITH CLUSTERING ORDER BY (active DESC, shard_id ASC); "; + + static final String SHARDS_MESSAGES_AVAILABLE_INFLIGHT = + "CREATE TABLE IF NOT EXISTS shards_messages_inflight ( " + + "queue_name text, " + + "region text, " + + "shard_id bigint, " + + "active int, " + + "pointer timeuuid, " + + "PRIMARY KEY ((queue_name, region), active, shard_id) " + + ") WITH CLUSTERING ORDER BY (active DESC, shard_id ASC); "; + + + @Inject + public ShardSerializationImpl( CassandraClient cassandraClient ) { + this.cassandraClient = cassandraClient; + } + + public void createShard(final Shard shard){ + + Statement insert = QueryBuilder.insertInto(getTableName(shard.getType())) + .value(COLUMN_QUEUE_NAME, shard.getQueueName()) + .value(COLUMN_REGION, shard.getRegion()) + .value(COLUMN_SHARD_ID, shard.getShardId()) + .value(COLUMN_ACTIVE, 1) + .value(COLUMN_POINTER, shard.getPointer()); + + cassandraClient.getSession().execute(insert); + + } + + public Shard loadShard(final Shard shard){ + + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName()); + Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion()); + Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1); + Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId()); + + + + Statement select = QueryBuilder.select().from(getTableName(shard.getType())) + .where(queueNameClause) + .and(regionClause) + .and(activeClause) + .and(shardIdClause); + + Row row = cassandraClient.getSession().execute(select).one(); + + if (row == null){ + return null; + } + + final String queueName = row.getString(COLUMN_QUEUE_NAME); + final String region = row.getString(COLUMN_REGION); + final long shardId = row.getLong(COLUMN_SHARD_ID); + final UUID pointer = row.getUUID(COLUMN_POINTER); + + return new Shard(queueName, region, shard.getType(), shardId, pointer); + + + + } + + + public void deleteShard(final Shard shard){ + + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName()); + Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion()); + Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1); + Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId()); + + + + Statement delete = QueryBuilder.delete().from(getTableName(shard.getType())) + .where(queueNameClause) + .and(regionClause) + .and(activeClause) + .and(shardIdClause); + + cassandraClient.getSession().execute(delete); + + } + + public void updateShardPointer(final Shard shard){ + + Assignment assignment = QueryBuilder.set(COLUMN_POINTER, shard.getPointer()); + + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName()); + Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion()); + Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1); + Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId()); + + Statement update = QueryBuilder.update(getTableName(shard.getType())) + .with(assignment) + .where(queueNameClause) + .and(regionClause) + .and(activeClause) + .and(shardIdClause); + + cassandraClient.getSession().execute(update); + + } + + public static String getTableName(Shard.Type shardType){ + + String table; + if( shardType.equals(Shard.Type.DEFAULT)) { + table = TABLE_SHARDS_MESSAGES_AVAILABLE; + }else if (shardType.equals(Shard.Type.INFLIGHT)) { + table = TABLE_SHARDS_MESSAGES_INFLIGHT; + }else{ + throw new IllegalArgumentException("Unknown ShardType"); + } + + return table; + + } + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Lists.newArrayList( + new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ), + new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT ) + ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java new file mode 100644 index 0000000..cfd9a60 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding.impl; + +import com.google.inject.Inject; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy; + +import java.text.MessageFormat; +import java.util.Optional; +import java.util.UUID; + + +public class ShardStrategyImpl implements ShardStrategy { + + final CassandraClient cassandraClient; + + @Inject + public ShardStrategyImpl(CassandraClient cassandraClient) { + this.cassandraClient = cassandraClient; + } + + @Override + public Shard selectShard(String queueName, String region, Shard.Type shardType, UUID pointer) { + + // use shard iterator to walk through shards until shard can be found + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, region, shardType, Optional.empty() ); + + if ( !shardIterator.hasNext() ) { + String msg = MessageFormat.format( + "No shards found for queue {0} region {1} type {2}", queueName, region, shardType ); + throw new NotFoundException( msg ); + } + + // walk through shards from oldest to newest + + Shard prev = shardIterator.next(); + while ( shardIterator.hasNext() ) { + Shard next = shardIterator.next(); + + // if item is older than the next shard, the use prev shard + if ( pointer.timestamp() < next.getPointer().timestamp() ) { + return prev; + } + prev = next; + } + return prev; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java new file mode 100644 index 0000000..048096d --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.transferlog; + +import java.util.UUID; + + +public class TransferLog { + String queueName; + String sourceRegion; + String destRegion; + UUID messageId; + long transfer_time; + + public TransferLog( + String queueName, + String sourceRegion, + String destRegion, + UUID messageId, + long transfer_time) { + this.queueName = queueName; + this.sourceRegion = sourceRegion; + this.destRegion = destRegion; + this.messageId = messageId; + this.transfer_time = transfer_time; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getSourceRegion() { + return sourceRegion; + } + + public void setSourceRegion(String sourceRegion) { + this.sourceRegion = sourceRegion; + } + + public String getDestRegion() { + return destRegion; + } + + public void setDestRegion(String destRegion) { + this.destRegion = destRegion; + } + + public UUID getMessageId() { + return messageId; + } + + public void setMessageId(UUID messageId) { + this.messageId = messageId; + } + + public long getTransfer_time() { + return transfer_time; + } + + public void setTransfer_time(long transfer_time) { + this.transfer_time = transfer_time; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java new file mode 100644 index 0000000..ea155d7 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.transferlog; + +import com.datastax.driver.core.PagingState; +import org.apache.usergrid.persistence.core.migration.schema.Migration; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaException; +import org.apache.usergrid.persistence.qakka.serialization.Result; + +import java.util.UUID; + + +public interface TransferLogSerialization extends Migration { + + /** + * Record transfer log record. + * + * @param queueName Name of queue. + * @param source Source region. + * @param dest Destination region. + * @param messageId UUID of message in message_data table. + */ + void recordTransferLog( + String queueName, String source, String dest, UUID messageId); + + /** + * Remove transfer log record. + * + * @param queueName Name of queue. + * @param source Source region. + * @param dest Destination region. + * @param messageId UUID of message in message_data table. + * @throws QakkaException If transfer log message was not found or could not be removed. + */ + void removeTransferLog( + String queueName, String source, String dest, UUID messageId) throws QakkaException; + + /** + * Get all transfer logs (for testing purposes) + * + * @param pagingState Paging state (or null if none) + * @param fetchSize Number of rows to be fetched per page (or -1 for default) + */ + Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java new file mode 100644 index 0000000..f9fb0dc --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.transferlog.impl; + +import com.datastax.driver.core.PagingState; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaException; +import org.apache.usergrid.persistence.qakka.serialization.Result; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + + +public class TransferLogSerializationImpl implements TransferLogSerialization { + + private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + public final static String TABLE_TRANSFER_LOG = "transfer_log"; + + public final static String COLUMN_QUEUE_NAME = "queue_name"; + public final static String COLUMN_SOURCE_REGION = "source_region"; + public final static String COLUMN_DEST_REGION = "dest_region"; + public final static String COLUMN_MESSAGE_ID = "message_id"; + public final static String COLUMN_TRANSFER_TIME = "transfer_time"; + + static final String CQL = + "CREATE TABLE IF NOT EXISTS transfer_log ( " + + "queue_name text, " + + "source_region text, " + + "dest_region text, " + + "message_id timeuuid, " + + "transfer_time bigint, " + + "PRIMARY KEY ((queue_name, dest_region, message_id)) " + + "); "; + + + @Inject + public TransferLogSerializationImpl( CassandraClient cassandraClient ) { + this.cassandraClient = cassandraClient; + } + + + @Override + public void recordTransferLog( + String queueName, String source, String dest, UUID messageId) { + + Statement insert = QueryBuilder.insertInto(TABLE_TRANSFER_LOG) + .value(COLUMN_QUEUE_NAME, queueName ) + .value(COLUMN_SOURCE_REGION, source ) + .value(COLUMN_DEST_REGION, dest ) + .value(COLUMN_MESSAGE_ID, messageId ) + .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() ); + cassandraClient.getSession().execute(insert); + } + + + @Override + public void removeTransferLog( + String queueName, String source, String dest, UUID messageId ) throws QakkaException { + + Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName )) + .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest )) + .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId )); + ResultSet rs = cassandraClient.getSession().execute( query ); + + if ( rs.getAvailableWithoutFetching() == 0 ) { + StringBuilder sb = new StringBuilder(); + sb.append( "Transfer log entry not found for queueName=" ).append( queueName ); + sb.append( " source=" ).append( source ); + sb.append( " dest=" ).append( dest ); + sb.append( " messageId=" ).append( messageId ); + throw new QakkaException( sb.toString() ); + } + + Statement deleteQuery = QueryBuilder.delete().from(TABLE_TRANSFER_LOG) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName )) + .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest )) + .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId )); + cassandraClient.getSession().execute( deleteQuery ); + } + + + @Override + public Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize ) { + + Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG); + + query.setFetchSize( fetchSize ); + if ( pagingState != null ) { + query.setPagingState( pagingState ); + } + + ResultSet rs = cassandraClient.getSession().execute( query ); + final PagingState newPagingState = rs.getExecutionInfo().getPagingState(); + + final List<TransferLog> transferLogs = new ArrayList<>(); + int numReturned = rs.getAvailableWithoutFetching(); + for ( int i=0; i<numReturned; i++ ) { + Row row = rs.one(); + TransferLog tlog = new TransferLog( + row.getString( COLUMN_QUEUE_NAME ), + row.getString( COLUMN_SOURCE_REGION ), + row.getString( COLUMN_DEST_REGION ), + row.getUUID( COLUMN_MESSAGE_ID ), + row.getLong( COLUMN_TRANSFER_TIME )); + transferLogs.add( tlog ); + } + + return new Result<TransferLog>() { + + @Override + public PagingState getPagingState() { + return newPagingState; + } + + @Override + public List<TransferLog> getEntities() { + return transferLogs; + } + }; + } + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Collections.singletonList( new TableDefinitionStringImpl( TABLE_TRANSFER_LOG, CQL ) ); + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java index 6d62da0..7bd0fa7 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java @@ -18,18 +18,18 @@ package org.apache.usergrid.persistence.queue.guice; +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory; +import org.apache.usergrid.persistence.queue.impl.QakkaQueueManager; import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl; import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl; import org.safehaus.guicyfig.GuicyFigModule; -import org.apache.usergrid.persistence.queue.LegacyQueueFig; -import org.apache.usergrid.persistence.queue.LegacyQueueManager; -import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; - -import com.google.inject.AbstractModule; -import com.google.inject.assistedinject.FactoryModuleBuilder; - /** * Simple module for wiring our collection api @@ -44,11 +44,11 @@ public class QueueModule extends AbstractModule { install(new GuicyFigModule(LegacyQueueFig.class)); + install( new QakkaModule() ); + bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class); - install(new FactoryModuleBuilder().implement(LegacyQueueManager.class, SNSQueueManagerImpl.class) + install( new FactoryModuleBuilder().implement(LegacyQueueManager.class, QakkaQueueManager.class) .build(LegacyQueueManagerInternalFactory.class)); } - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java new file mode 100644 index 0000000..c407a78 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue.impl; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + + +public class QakkaQueueManager implements LegacyQueueManager { + private static final Logger logger = LoggerFactory.getLogger( QakkaQueueManager.class ); + + private final LegacyQueueScope scope; + private final LegacyQueueFig fig; + private final QueueManager queueManager; + private final QueueMessageManager queueMessageManager; + private final QakkaFig qakkaFig; + private final Regions regions; + + + @Inject + public QakkaQueueManager( + @Assisted LegacyQueueScope scope, + LegacyQueueFig fig, + QueueManager queueManager, + QueueMessageManager queueMessageManager, + QakkaFig qakkaFig, + Regions regions + ) { + + this.scope = scope; + this.fig = fig; + this.queueManager = queueManager; + this.qakkaFig = qakkaFig; + this.queueMessageManager = queueMessageManager; + this.regions = regions; + + if ( queueManager.getQueueConfig(scope.getName()) == null ) { + + // TODO: read defaults from config + //queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); + + Queue queue = new Queue( scope.getName() ); + queueManager.createQueue( queue ); + } + } + + + @Override + public <T extends Serializable> void sendMessage(T body) throws IOException { + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(body); + oos.flush(); + oos.close(); + ByteBuffer byteBuffer = ByteBuffer.wrap( bos.toByteArray() ); + + queueMessageManager.sendMessages( + scope.getName(), + regions.getRegions( scope.getRegionImplementation().name() ), + null, // delay millis + null, // expiration seconds + "application/octet-stream", + DataType.serializeValue( byteBuffer, ProtocolVersion.NEWEST_SUPPORTED )); + } + + + @Override + public <T extends Serializable> void sendMessageToTopic(T body) throws IOException { + sendMessage( body ); + } + + + @Override + public List<LegacyQueueMessage> getMessages(int limit, Class klass) { + + List<LegacyQueueMessage> messages = new ArrayList<>(); + List<QueueMessage> qakkaMessages = queueMessageManager.getNextMessages( scope.getName(), limit ); + + for ( QueueMessage qakkaMessage : qakkaMessages ) { + + Object body; + try { + ByteBuffer messageData = queueMessageManager.getMessageData( qakkaMessage.getMessageId() ); + ByteBuffer bb = (ByteBuffer)DataType.blob().deserialize( + messageData, ProtocolVersion.NEWEST_SUPPORTED ); + + ByteArrayInputStream bais = new ByteArrayInputStream( bb.array() ); + ObjectInputStream ios = new ObjectInputStream( bais ); + body = ios.readObject(); + + } catch (Throwable t) { + throw new QakkaRuntimeException( "Error de-serializing object", t ); + } + + LegacyQueueMessage legacyQueueMessage = new LegacyQueueMessage( + qakkaMessage.getQueueMessageId().toString(), + null, // handle + body, + null); // type + + messages.add( legacyQueueMessage ); + } + + return messages; + } + + + @Override + public long getQueueDepth() { + return 0; + } + + + @Override + public void commitMessage(LegacyQueueMessage queueMessage) { + + UUID queueMessageId = UUID.fromString( queueMessage.getMessageId() ); + queueMessageManager.ackMessage( scope.getName(), queueMessageId ); + } + + + @Override + public void commitMessages(List<LegacyQueueMessage> queueMessages) { + + for ( LegacyQueueMessage message : queueMessages ) { + commitMessage( message ); + } + } + + + @Override + public void sendMessages( List bodies ) throws IOException { + + for ( Object body : bodies ) { + sendMessage( (Serializable)body ); + } + + } + + + @Override + public void deleteQueue() { + queueManager.deleteQueue( scope.getName() ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index 2d51903..c1bdc72 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -67,7 +67,9 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory { }); @Inject - public QueueManagerFactoryImpl(final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory){ + public QueueManagerFactoryImpl( + final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory) { + this.queueFig = queueFig; this.queuemanagerInternalFactory = queuemanagerInternalFactory; this.defaultManager = new HashMap<>(10); http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml b/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml new file mode 100644 index 0000000..cc94f07 --- /dev/null +++ b/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<web-app xmlns="http://java.sun.com/xml/ns/javaee" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" + version="3.0"> + + <display-name>qakka</display-name> + + <listener> + <listener-class>org.apache.usergrid.persistence.qakka.api.impl.StartupListener</listener-class> + </listener> + + <filter> + <filter-name>qakka</filter-name> + <filter-class>org.glassfish.jersey.servlet.ServletContainer</filter-class> + + <init-param> + <param-name>javax.ws.rs.Application</param-name> + <param-value>org.apache.usergrid.persistence.qakka.api.impl.JerseyResourceConfig</param-value> + </init-param> + + <init-param> + <param-name>jersey.config.server.tracing.type</param-name> + <!-- allowed values are OFF, ON_DEMAND, ALL --> + <param-value>OFF</param-value> + </init-param> + + </filter> + + <filter-mapping> + <filter-name>qakka</filter-name> + <url-pattern>/api/*</url-pattern> + </filter-mapping> + +</web-app> http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java new file mode 100644 index 0000000..8f5284c --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.core.migration.schema.MigrationException; +import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; +import org.apache.usergrid.persistence.queue.guice.QueueModule; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + + +public class AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class ); + + static AtomicInteger nextPort = new AtomicInteger(3551); + + protected static Injector sharedInjector; + + static { new KeyspaceDropper(); } + + + public AbstractTest() { + if ( getInjector() == null ) { + setInjector( Guice.createInjector( new QueueModule() ) ); + MigrationManager migrationManager = getInjector().getInstance( MigrationManager.class ); + try { + migrationManager.migrate(); + } catch (MigrationException e) { + logger.error("Error in migration", e); + } + } + } + + protected Injector getInjector() { + return sharedInjector; + } + + protected static void setInjector(Injector injector) { + AbstractTest.sharedInjector = injector; + } + + + protected int getNextAkkaPort() { + int ret = nextPort.getAndIncrement(); + logger.info("Returning port {} for this {}", ret, this.hashCode()); + return ret; + } + + + @BeforeClass + public static void startCassandra() throws Exception { + //EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml"); + } + + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java new file mode 100644 index 0000000..aa4dfd1 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + + +/** + * Created by Dave Johnson (snoopd...@apache.org) on 9/9/16. + */ +public class KeyspaceDropper { + + private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class ); + + static { dropTestKeyspace(); } + + + public static void dropTestKeyspace() { + + String propsFileName = "qakka.properties"; + + Properties props = new Properties(); + try { + props.load( App.class.getResourceAsStream( "/" + propsFileName ) ); + } catch (IOException e) { + throw new RuntimeException( "Unable to load " + propsFileName + " file!" ); + } + + String keyspace = (String)props.get("cassandra.keyspace.application"); + String hosts[] = props.getProperty( "cassandra.hosts", "127.0.0.1" ).split(","); + int port = Integer.parseInt( props.getProperty( "cassandra.port", "9042" )); + + Cluster.Builder builder = Cluster.builder(); + for ( String host : hosts ) { + builder = builder.addContactPoint( host ).withPort( port ); + } + + final QueryOptions queryOptions = new QueryOptions().setConsistencyLevel( ConsistencyLevel.LOCAL_QUORUM ); + builder.withQueryOptions( queryOptions ); + Cluster cluster = builder.build(); + + Session session = cluster.connect(); + logger.info("Dropping test keyspace: {}", keyspace); + session.execute( "DROP KEYSPACE IF EXISTS " + keyspace ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java new file mode 100644 index 0000000..a7d6215 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api; + + +import org.apache.usergrid.persistence.qakka.KeyspaceDropper; +import org.apache.usergrid.persistence.qakka.api.impl.StartupListener; +import org.apache.usergrid.persistence.qakka.api.impl.JerseyResourceConfig; +import org.glassfish.jersey.test.DeploymentContext; +import org.glassfish.jersey.test.JerseyTest; +import org.glassfish.jersey.test.ServletDeploymentContext; +import org.junit.BeforeClass; + +import javax.ws.rs.core.Application; + + +abstract public class AbstractRestTest extends JerseyTest { + + static Application app; + + static DeploymentContext context = null; + + static { new KeyspaceDropper(); } + + + @BeforeClass + public static void startCassandra() throws Exception { + //EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml"); + } + + @Override + protected Application configure() { + if ( app == null ) { + app = new JerseyResourceConfig(); + } + return app; + } + + @Override + protected DeploymentContext configureDeployment() { + if ( context == null ) { + context = ServletDeploymentContext.builder( configure() ) .addListener( StartupListener.class ).build(); + } + return context; + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java new file mode 100644 index 0000000..b99be69 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.core.QueueMessage; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URISyntaxException; +import java.util.*; + + +public class PerformanceTest { + private static final Logger logger = LoggerFactory.getLogger( PerformanceTest.class ); + + + @Test + @Ignore("needs exernal Tomcat an Cassandra") + public void testSendAndGetMessagePerformance() throws URISyntaxException, JsonProcessingException { + + Client client = ClientBuilder.newClient(); + + WebTarget target = client.target("http://macsnoopdave2013:8080/api/"); + + // create a queue + + String queueName = "pt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }}; + target.path("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); + + // send some messages + int numMessages = 20000; + + { + ObjectMapper mapper = new ObjectMapper(); + List<Long> times = new ArrayList<>( numMessages ); + int errorCount = 0; + int counter = 0; + + for (int i = 0; i < numMessages; i++) { + + final int number = i; + Map<String, Object> messageMap = new HashMap<String, Object>() {{ + put( "message", "this is message #" + number ); + put( "valid", true ); + }}; + String body = mapper.writeValueAsString( messageMap ); + + long startTime = System.currentTimeMillis(); + Response post = target.path( "queues" ).path( queueName ).path( "messages" ) + .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE ) ); + long stopTime = System.currentTimeMillis(); + times.add( stopTime - startTime ); + + if ( post.getStatus() != 200 ) { + errorCount++; + } + + if ( ++counter % 500 == 0 ) { + logger.debug("Sent {} messages with error count {}", counter, errorCount); + } + + try { Thread.sleep(5); } catch ( Exception intentionallyIgnored ) {}; + } + + Long total = times.stream().mapToLong( time -> time ).sum(); + Long max = times.stream().max( Comparator.comparing( time -> time ) ).get(); + Long min = times.stream().min( Comparator.comparing( time -> time ) ).get(); + Double average = times.stream().mapToLong( time -> time ).average().getAsDouble(); + + logger.debug( "\n>>>>>>> Total send time {}ms, min {}ms, max {}ms, average {}ms errors {}\n\n", + total, min, max, average, errorCount ); + } + + // get all messages, checking for dups + + { + Set<UUID> messageIds = new HashSet<>(); + List<Long> times = new ArrayList<>( numMessages ); + int errorCount = 0; + int counter = 0; + + for (int j = 0; j < numMessages; j++) { + + long startTime = System.currentTimeMillis(); + Response response = target.path( "queues" ).path( queueName ).path( "messages" ).request().get(); + long stopTime = System.currentTimeMillis(); + times.add( stopTime - startTime ); + + if ( ++counter % 500 == 0 ) { + logger.debug("Got {} messages with error count {}", counter, errorCount); + } + + if ( response .getStatus() != 200 ) { + errorCount++; + continue; + } + + ApiResponse apiResponse = response.readEntity( ApiResponse.class ); + QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next(); + + if (messageIds.contains( queueMessage.getQueueMessageId() )) { + Assert.fail( "Message fetched twice: " + queueMessage.getQueueMessageId() ); + } else { + messageIds.add( queueMessage.getQueueMessageId() ); + } + } + Assert.assertEquals( numMessages, messageIds.size() ); + + Long total = times.stream().mapToLong( time -> time ).sum(); + Long max = times.stream().max( Comparator.comparing( time -> time ) ).get(); + Long min = times.stream().min( Comparator.comparing( time -> time ) ).get(); + Double average = times.stream().mapToLong( time -> time ).average().getAsDouble(); + + logger.debug( "\n>>>>>>> Total get time {}ms, min {}ms, max {}ms, average {}ms errors {}\n\n", + total, min, max, average, errorCount ); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java new file mode 100644 index 0000000..fcb4212 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.ByteStreams; +import com.google.inject.Injector; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.api.impl.StartupListener; +import org.apache.usergrid.persistence.qakka.core.QueueMessage; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.*; + +import static org.junit.Assert.fail; + + +public class QueueResourceTest extends AbstractRestTest { + private static final Logger logger = LoggerFactory.getLogger( QueueResourceTest.class ); + + static private final TypeReference<Map<String,Object>> jsonMapTypeRef + = new TypeReference<Map<String,Object>>() {}; + + @Test + public void testCreateQueue() throws URISyntaxException { + + // create a queue + + String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + Map<String, Object> queueMap = new HashMap<String, Object>() {{ + put("name", queueName); + }}; + Response response = target("queues").request() + .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); + + Assert.assertEquals( 201, response.getStatus() ); + URIStrategy uriStrategy = StartupListener.INJECTOR.getInstance( URIStrategy.class ); + Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) ); + + // get queue by name + + response = target("queues").path( queueName ).path( "config" ).request().get(); + Assert.assertEquals( 200, response.getStatus() ); + ApiResponse apiResponse = response.readEntity( ApiResponse.class ); + Assert.assertNotNull( apiResponse.getQueues() ); + Assert.assertFalse( apiResponse.getQueues().isEmpty() ); + Assert.assertEquals( 1, apiResponse.getQueues().size() ); + Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() ); + } + + + @Test + public void testDeleteQueue() throws URISyntaxException { + + // create a queue + + String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }}; + Response response = target("queues").request() + .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); + + Assert.assertEquals( 201, response.getStatus() ); + URIStrategy uriStrategy = StartupListener.INJECTOR.getInstance( URIStrategy.class ); + Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) ); + + // delete queue without confirm = true, should fail with bad request + + response = target("queues").path( queueName ).request().delete(); + Assert.assertEquals( 400, response.getStatus() ); + + // delete queue with confirm = true + + response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); + + // cannot get queue by name + + response = target("queues").path( queueName ).path( "config" ).request().get(); + Assert.assertEquals( 404, response.getStatus() ); + } + + + @Test + public void testSendMessageToBadQueue() throws URISyntaxException, JsonProcessingException, InterruptedException { + + String queueName = "bogus_queue_is_bogus"; + Map<String, Object> messageMap = new HashMap<String, Object>() {{ put("dummy_prop", "dummy_value"); }}; + ObjectMapper mapper = new ObjectMapper(); + String body = mapper.writeValueAsString( messageMap ); + + Response response = target("queues").path( queueName ).path( "messages" ) + .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE )); + + Assert.assertEquals( 404, response.getStatus() ); + } + + + @Test + public void testSendJsonMessagesAsJson() throws URISyntaxException, IOException, InterruptedException { + sendJsonMessages( true ); + } + + + @Test + public void testSendMessagesJsonAsOctetStream() throws URISyntaxException, IOException, InterruptedException { + sendJsonMessages( false ); + } + + + /** + * Send 100 JSON payload messages to queue. + * @param asJson True to send with content-type header 'application/json' + * False to send with content-type header 'application/octet stream' + */ + private void sendJsonMessages( boolean asJson ) throws URISyntaxException, IOException, InterruptedException { + + // create a queue + + String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + Map<String, Object> queueMap = new HashMap<String, Object>() {{ + put( "name", queueName ); + }}; + target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) ); + + // send some messages + + ObjectMapper mapper = new ObjectMapper(); + + int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + + final int number = i; + Map<String, Object> messageMap = new HashMap<String, Object>() {{ + put( "message", "this is message #" + number ); + put( "valid", true ); + }}; + String body = mapper.writeValueAsString( messageMap ); + + Response response; + if ( asJson ) { + response = target( "queues" ).path( queueName ).path( "messages" ) + .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) ); + } else { + response = target( "queues" ).path( queueName ).path( "messages" ) + .queryParam( "contentType", MediaType.APPLICATION_JSON ) + .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) ); + } + + Assert.assertEquals( 200, response.getStatus() ); + } + + // get all messages, checking for dups + + checkJsonMessages( queueName, numMessages ); + } + + + private Set<UUID> checkJsonMessages( String queueName, int numMessages ) throws IOException { + + ObjectMapper mapper = new ObjectMapper(); + + Set<UUID> messageIds = new HashSet<>(); + for ( int j=0; j<numMessages; j++ ) { + + int retries = 0; + int maxRetries = 10; + ApiResponse apiResponse = null; + while ( retries++ < maxRetries ) { + Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); + apiResponse = response.readEntity( ApiResponse.class ); + if ( !apiResponse.getQueueMessages().isEmpty() ) { + break; + } + try { Thread.sleep(500); } catch (Exception ignored) {} + } + + Assert.assertNotNull( apiResponse ); + Assert.assertNotNull( apiResponse.getQueueMessages() ); + Assert.assertEquals( 1, apiResponse.getQueueMessages().size() ); + + QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next(); + Map<String, Object> payload = mapper.readValue( queueMessage.getData(), jsonMapTypeRef ); + + Assert.assertEquals( queueName, queueMessage.getQueueName() ); + Assert.assertNull( queueMessage.getHref() ); + Assert.assertEquals( true, payload.get("valid") ); + + if (messageIds.contains( queueMessage.getQueueMessageId() )) { + Assert.fail("Message fetched twice: " + queueMessage.getQueueMessageId() ); + } else { + messageIds.add( queueMessage.getQueueMessageId() ); + } + } + Assert.assertEquals( numMessages, messageIds.size() ); + + return messageIds; + } + + + @Test + public void testSendBinaryMessages() throws URISyntaxException, IOException, InterruptedException { + + // create a queue + + String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + Map<String, Object> queueMap = new HashMap<String, Object>() {{ + put( "name", queueName ); + }}; + target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) ); + + // send messages each with image/jpg payload + + InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg"); + byte[] bytes = ByteStreams.toByteArray( is ); + + int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + + Response response = target( "queues" ).path( queueName ).path( "messages" ) + .queryParam( "contentType", "image/jpg" ) + .request() + .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM )); + + Assert.assertEquals( 200, response.getStatus() ); + } + + // get all messages, checking for dups + + checkBinaryMessages( queueName, numMessages ); + } + + + private Set<UUID> checkBinaryMessages( String queueName, int numMessages ) throws IOException { + + Set<UUID> messageIds = new HashSet<>(); + for ( int j=0; j<numMessages; j++ ) { + + Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); + + ApiResponse apiResponse = response.readEntity( ApiResponse.class ); + Assert.assertNotNull( apiResponse.getQueueMessages() ); + Assert.assertFalse( apiResponse.getQueueMessages().isEmpty() ); + Assert.assertEquals( 1, apiResponse.getQueueMessages().size() ); + + QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next(); + + // no data in a binary message + Assert.assertNull( queueMessage.getData() ); + + // data can be found at HREF provided + Assert.assertNotNull( queueMessage.getHref() ); + + Response binaryResponse = target("queues") + .path( queueName ).path("data").path( queueMessage.getQueueMessageId().toString() ) + .request().accept( "image/jpg" ).get(); + + Assert.assertEquals( 200, binaryResponse.getStatus() ); + InputStream is = binaryResponse.readEntity( InputStream.class ); + + byte[] imageBytes = ByteStreams.toByteArray( is ); + Assert.assertEquals( 11188, imageBytes.length); + + if (messageIds.contains( queueMessage.getQueueMessageId() )) { + fail("Message fetched twice: " + queueMessage.getQueueMessageId() ); + } else { + messageIds.add( queueMessage.getQueueMessageId() ); + } + } + Assert.assertEquals( numMessages, messageIds.size() ); + + return messageIds; + } + + + @Test + public void testSendMessageAckAndTimeout() throws URISyntaxException, IOException, InterruptedException { + + // create a queue + + String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }}; + target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); + + // send some messages + + ObjectMapper mapper = new ObjectMapper(); + + int numMessages = 100; + for ( int i=0; i<numMessages; i++ ) { + + final int number = i; + Map<String, Object> messageMap = new HashMap<String, Object>() {{ + put("message", "this is message #" + number); + put("valid", true ); + }}; + String body = mapper.writeValueAsString( messageMap ); + + Response response = target("queues").path( queueName ).path( "messages" ) + .request().post( Entity.entity( body, MediaType.APPLICATION_JSON )); + + Assert.assertEquals( 200, response.getStatus() ); + } + + // get all messages, checking for dups + + Set<UUID> messageIds = checkJsonMessages( queueName, numMessages ); + + // there should be no more messages available + + Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); + ApiResponse apiResponse = response.readEntity( ApiResponse.class ); + Assert.assertNotNull( apiResponse.getQueueMessages() ); + Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() ); + + // ack half of the messages + + int count = 0; + Set<UUID> ackedIds = new HashSet<>(); + for ( UUID queueMessageId : messageIds ) { + response = target( "queues" ) + .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); + ackedIds.add( queueMessageId ); + if ( ++count >= numMessages/2 ) { + break; + } + } + messageIds.removeAll( ackedIds ); + + // wait for remaining of the messages to timeout + + QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class ); + Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 ); + + // now, the remaining messages cannot be acked because they timed out + + for ( UUID queueMessageId : messageIds ) { + response = target( "queues" ) + .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete(); + Assert.assertEquals( 400, response.getStatus() ); + } + + // and, those same messages should be available again in the queue + + checkJsonMessages( queueName, numMessages/2 ); + } + + + @Test + public void testConvertDelayParameter() { + + Injector injector = StartupListener.INJECTOR; + QueueResource queueResource = injector.getInstance( QueueResource.class ); + + Assert.assertEquals( 0L, queueResource.convertDelayParameter( "" ).longValue() ); + Assert.assertEquals( 0L, queueResource.convertDelayParameter( "0" ).longValue() ); + Assert.assertEquals( 0L, queueResource.convertDelayParameter( "NONE" ).longValue() ); + Assert.assertEquals( 5L, queueResource.convertDelayParameter( "5" ).longValue() ); + + try { + queueResource.convertDelayParameter( "bogus value" ); + fail("Expected exception on bad value"); + } catch ( IllegalArgumentException expected ) { + // pass + } + } + + @Test + public void testConvertExpirationParameter() { + + Injector injector = StartupListener.INJECTOR; + QueueResource queueResource = injector.getInstance( QueueResource.class ); + + Assert.assertNull( queueResource.convertExpirationParameter( "" ) ); + Assert.assertNull( queueResource.convertExpirationParameter( "NEVER" ) ); + + Assert.assertEquals( 5L, queueResource.convertExpirationParameter( "5" ).longValue() ); + + try { + queueResource.convertExpirationParameter( "bogus value" ); + fail("Expected exception on bad value"); + } catch ( IllegalArgumentException expected ) { + // pass + } + } + +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java new file mode 100644 index 0000000..42423fa --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.common; + +import com.datastax.driver.core.Session; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.junit.Test; + + +/** + * Created by russo on 6/8/16. + */ +public class CassandraClientTest extends AbstractTest { + + @Test + public void getClient(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + + Session session = cassandraClient.getSession(); + + session.getLoggedKeyspace(); + + } + + +}