http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java new file mode 100644 index 0000000..fcd2161 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.UntypedActor; +import com.codahale.metrics.Timer; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.UUID; + + +public class QueueTimeouter extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class ); + + private final String queueName; + + private final QueueMessageSerialization messageSerialization; + private final MetricsService metricsService; + private final ActorSystemFig actorSystemFig; + private final QakkaFig qakkaFig; + private final CassandraClient cassandraClient; + + + public QueueTimeouter(String queueName ) { + this.queueName = queueName; + + Injector injector = App.INJECTOR; + + messageSerialization = injector.getInstance( QueueMessageSerialization.class ); + actorSystemFig = injector.getInstance( ActorSystemFig.class ); + qakkaFig = injector.getInstance( QakkaFig.class ); + metricsService = injector.getInstance( MetricsService.class ); + cassandraClient = injector.getInstance( CassandraClientImpl.class ); + } + + + @Override + public void onReceive(Object message) { + + if ( message instanceof QueueTimeoutRequest) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.TIMEOUT_TIME).time(); + + try { + + QueueTimeoutRequest request = (QueueTimeoutRequest) message; + + if (!request.getQueueName().equals( queueName )) { + throw new QakkaRuntimeException( + "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() ); + } + + //logger.debug("Processing timeouts for queue {} ", queueName ); + + int count = 0; + String region = actorSystemFig.getRegionLocal(); + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, region, Shard.Type.INFLIGHT, Optional.empty()); + + MultiShardMessageIterator multiShardIteratorInflight = new MultiShardMessageIterator( + cassandraClient, queueName, region, DatabaseQueueMessage.Type.INFLIGHT, shardIterator, null); + + while ( multiShardIteratorInflight.hasNext() ) { + + DatabaseQueueMessage queueMessage = multiShardIteratorInflight.next(); + + long currentTime = System.currentTimeMillis(); + + if ((currentTime - queueMessage.getInflightAt()) > qakkaFig.getQueueTimeoutSeconds() * 1000) { + + // put message back in messages_available table as new queue message with new UUID + + UUID newQueueMessageId = QakkaUtils.getTimeUuid(); + + DatabaseQueueMessage newMessage = new DatabaseQueueMessage( + queueMessage.getMessageId(), + DatabaseQueueMessage.Type.DEFAULT, + queueMessage.getQueueName(), + queueMessage.getRegion(), + null, + queueMessage.getQueuedAt(), + queueMessage.getInflightAt(), + newQueueMessageId ); + + messageSerialization.writeMessage( newMessage ); + + // remove message from inflight table + + messageSerialization.deleteMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + DatabaseQueueMessage.Type.INFLIGHT, + queueMessage.getQueueMessageId() ); + + count++; + } + } + + if (count > 0) { + logger.debug( "Timed out {} messages for queue {}", count, queueName ); + } + + } finally { + timer.close(); + } + + } else { + unhandled( message ); + } + } +}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java new file mode 100644 index 0000000..6c91eb0 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.UntypedActor; +import com.codahale.metrics.Timer; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + + +public class QueueWriter extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( QueueWriter.class ); + + public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR }; + + private final QueueMessageSerialization messageSerialization; + private final TransferLogSerialization transferLogSerialization; + private final AuditLogSerialization auditLogSerialization; + private final MetricsService metricsService; + + + public QueueWriter() { + + Injector injector = App.INJECTOR; + + messageSerialization = injector.getInstance( QueueMessageSerialization.class ); + transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); + auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); + metricsService = injector.getInstance( MetricsService.class ); + } + + @Override + public void onReceive(Object message) { + + if (message instanceof QueueWriteRequest) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time(); + + try { + QueueWriteRequest qa = (QueueWriteRequest) message; + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + // TODO: implement deliveryTime and expirationTime + + DatabaseQueueMessage dbqm = null; + long currentTime = System.currentTimeMillis(); + + try { + dbqm = new DatabaseQueueMessage( + qa.getMessageId(), + DatabaseQueueMessage.Type.DEFAULT, + qa.getQueueName(), + qa.getDestRegion(), + null, + currentTime, + currentTime, + queueMessageId ); + + messageSerialization.writeMessage( dbqm ); + + //logger.debug("Wrote queue message id {} to queue name {}", + // dbqm.getQueueMessageId(), dbqm.getQueueName()); + + } catch (Throwable t) { + logger.debug("Error creating database queue message", t); + + auditLogSerialization.recordAuditLog( + AuditLog.Action.SEND, + AuditLog.Status.ERROR, + qa.getQueueName(), + qa.getDestRegion(), + qa.getMessageId(), + dbqm.getMessageId() ); + + getSender().tell( new QueueWriteResponse( + QueueWriter.WriteStatus.ERROR ), getSender() ); + + return; + } + + auditLogSerialization.recordAuditLog( + AuditLog.Action.SEND, + AuditLog.Status.SUCCESS, + qa.getQueueName(), + qa.getDestRegion(), + qa.getMessageId(), + dbqm.getQueueMessageId() ); + + try { + transferLogSerialization.removeTransferLog( + qa.getQueueName(), + qa.getSourceRegion(), + qa.getDestRegion(), + qa.getMessageId() ); + + getSender().tell( new QueueWriteResponse( + QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() ); + + } catch (Throwable e) { + logger.error("Error deleting transferlog", e); + logger.debug( "Unable to delete transfer log for {} {} {} {}", + qa.getQueueName(), + qa.getSourceRegion(), + qa.getDestRegion(), + qa.getMessageId() ); + + getSender().tell( new QueueWriteResponse( + QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() ); + } + + } finally { + timer.close(); + } + + } else { + unhandled( message ); + } + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java new file mode 100644 index 0000000..9cf06d9 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.routing.FromConfig; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; + + +/** + * Route messages to QueueWriters + */ +public class QueueWriterRouter extends UntypedActor { + + private final ActorRef router; + + + public QueueWriterRouter() { + + router = getContext().actorOf( + FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router"); + } + + @Override + public void onReceive(Object message) { + + if ( message instanceof QueueWriteRequest) { + router.tell( message, getSender() ); + + } else { + unhandled(message); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java new file mode 100644 index 0000000..46e4906 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + + +import akka.actor.UntypedActor; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.UUID; + + +public class ShardAllocator extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class ); + + private final String queueName; + + private final QakkaFig qakkaFig; + private final ActorSystemFig actorSystemFig; + private final ShardSerialization shardSerialization; + private final ShardCounterSerialization shardCounterSerialization; + private final MetricsService metricsService; + private final CassandraClient cassandraClient; + + + public ShardAllocator( String queueName ) { + this.queueName = queueName; + + Injector injector = App.INJECTOR; + + this.qakkaFig = injector.getInstance( QakkaFig.class ); + this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class ); + this.shardSerialization = injector.getInstance( ShardSerializationImpl.class ); + this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); + this.metricsService = injector.getInstance( MetricsService.class ); + this.cassandraClient = injector.getInstance( CassandraClientImpl.class ); + + logger.debug( "Created shard allocator for queue {}", queueName ); + } + + + @Override + public void onReceive( Object message ) throws Exception { + + if ( message instanceof ShardCheckRequest) { + + ShardCheckRequest request = (ShardCheckRequest) message; + + if (!request.getQueueName().equals( queueName )) { + throw new QakkaRuntimeException( + "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() ); + } + + // check both types of shard + checkLatestShard( Shard.Type.DEFAULT ); + checkLatestShard( Shard.Type.INFLIGHT ); + + } else { + unhandled( message ); + } + + } + + private void checkLatestShard( Shard.Type type ) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time(); + + try { + + String region = actorSystemFig.getRegionLocal(); + + // find newest shard + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, region, type, Optional.empty() ); + + Shard shard = null; + while (shardIterator.hasNext()) { + shard = shardIterator.next(); + } + + if (shard == null) { + logger.warn( "No shard found for {}, {}, {}", queueName, region, type ); + return; + } + + // if its count is greater than 90% of max shard size, then allocate a new shard + + long counterValue = 0; + try { + counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() ); + } catch ( NotFoundException ignored ) {} + + if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) { + + // Create UUID from a UNIX timestamp via DataStax utility + // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html + UUID futureUUID = UUIDs.startOf( + System.currentTimeMillis() + qakkaFig.getShardAllocationAdvanceTimeMillis()); + + Shard newShard = new Shard( queueName, region, type, shard.getShardId() + 1, futureUUID ); + shardSerialization.createShard( newShard ); + shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 ); + + logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}", + this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue ); + } + + } catch ( Throwable t ) { + logger.error("Error while checking shard allocations", t); + + } finally { + timer.close(); + } + + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java new file mode 100644 index 0000000..9551c61 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.impl; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.QueueManager; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.distributed.messages.*; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + + +@Singleton +public class DistributedQueueServiceImpl implements DistributedQueueService { + + private static final Logger logger = LoggerFactory.getLogger( DistributedQueueServiceImpl.class ); + + private final ActorSystemManager actorSystemManager; + private final QueueManager queueManager; + private final QakkaFig qakkaFig; + + + @Inject + public DistributedQueueServiceImpl( + ActorSystemManager actorSystemManager, + QueueManager queueManager, + QakkaFig qakkaFig, + QueueActorRouterProducer queueActorRouterProducer, + QueueWriterRouterProducer queueWriterRouterProducer, + QueueSenderRouterProducer queueSenderRouterProducer ) { + + this.actorSystemManager = actorSystemManager; + this.queueManager = queueManager; + this.qakkaFig = qakkaFig; + + actorSystemManager.registerRouterProducer( queueActorRouterProducer ); + actorSystemManager.registerRouterProducer( queueWriterRouterProducer ); + actorSystemManager.registerRouterProducer( queueSenderRouterProducer ); + } + + + @Override + public void init() { + for ( String queueName : queueManager.getListOfQueues() ) { + initQueue( queueName ); + } + } + + + @Override + public void initQueue(String queueName) { + QueueInitRequest request = new QueueInitRequest( queueName ); + ActorRef clientActor = actorSystemManager.getClientActor(); + clientActor.tell( request, null ); + } + + + @Override + public void refresh() { + for ( String queueName : queueManager.getListOfQueues() ) { + refreshQueue( queueName ); + } + } + + + @Override + public void refreshQueue(String queueName) { + QueueRefreshRequest request = new QueueRefreshRequest( queueName ); + ActorRef clientActor = actorSystemManager.getClientActor(); + clientActor.tell( request, null ); + } + + + @Override + public void processTimeouts() { + + for ( String queueName : queueManager.getListOfQueues() ) { + + QueueTimeoutRequest request = new QueueTimeoutRequest( queueName ); + + ActorRef clientActor = actorSystemManager.getClientActor(); + clientActor.tell( request, null ); + } + } + + + @Override + public DistributedQueueService.Status sendMessageToRegion( + String queueName, String sourceRegion, String destRegion, UUID messageId, + Long deliveryTime, Long expirationTime ) { + + List<String> queueNames = queueManager.getListOfQueues(); + if ( !queueNames.contains( queueName ) ) { + throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + } + + int maxRetries = qakkaFig.getMaxSendRetries(); + int retries = 0; + + QueueSendRequest request = new QueueSendRequest( + queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime ); + + while ( retries++ < maxRetries ) { + try { + Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS ); + + // send to current region via local clientActor + ActorRef clientActor = actorSystemManager.getClientActor(); + Future<Object> fut = Patterns.ask( clientActor, request, t ); + + // wait for response... + final Object response = Await.result( fut, t.duration() ); + + if ( response != null && response instanceof QueueSendResponse) { + QueueSendResponse qarm = (QueueSendResponse)response; + + if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) { + + if ( retries > 1 ) { + logger.debug("SUCCESS after {} retries", retries ); + } + return qarm.getSendStatus(); + + } else { + logger.debug("ERROR STATUS sending to queue, retrying {}", retries ); + } + + } else if ( response != null ) { + logger.debug("NULL RESPONSE sending to queue, retrying {}", retries ); + + } else { + logger.debug("TIMEOUT sending to queue, retrying {}", retries ); + } + + } catch ( Exception e ) { + logger.debug("ERROR sending to queue, retrying " + retries, e ); + } + } + + throw new QakkaRuntimeException( "Error sending to queue after " + retries ); + } + + + @Override + public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) { + + List<String> queueNames = queueManager.getListOfQueues(); + if ( !queueNames.contains( queueName ) ) { + throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + } + + int maxRetries = qakkaFig.getMaxGetRetries(); + int retries = 0; + + QueueGetRequest request = new QueueGetRequest( queueName, count ); + while ( retries++ < maxRetries ) { + try { + Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS ); + + // ask ClientActor and wait (up to timeout) for response + + Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t ); + final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() ); + + if ( response != null && response instanceof QueueGetResponse) { + QueueGetResponse qprm = (QueueGetResponse)response; + if ( qprm.isSuccess() ) { + if (retries > 1) { + logger.debug( "getNextMessage SUCCESS after {} retries", retries ); + } + } + return qprm.getQueueMessages(); + + + } else if ( response != null ) { + logger.debug("ERROR RESPONSE popping queue, retrying {}", retries ); + + } else { + logger.debug("TIMEOUT popping to queue, retrying {}", retries ); + } + + } catch ( Exception e ) { + logger.debug("ERROR popping to queue, retrying " + retries, e ); + } + } + + throw new QakkaRuntimeException( + "Error getting from queue " + queueName + " after " + retries ); + } + + + @Override + public Status ackMessage(String queueName, UUID queueMessageId ) { + + List<String> queueNames = queueManager.getListOfQueues(); + if ( !queueNames.contains( queueName ) ) { + return Status.BAD_REQUEST; + } + + QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId ); + return sendMessageToLocalQueueActors( message ); + } + + + @Override + public Status requeueMessage(String queueName, UUID messageId) { + + List<String> queueNames = queueManager.getListOfQueues(); + if ( !queueNames.contains( queueName ) ) { + throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + } + + QueueAckRequest message = new QueueAckRequest( queueName, messageId ); + return sendMessageToLocalQueueActors( message ); + } + + + @Override + public Status clearMessages(String queueName) { + + List<String> queueNames = queueManager.getListOfQueues(); + if ( !queueNames.contains( queueName ) ) { + throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + } + + // TODO: implement clear queue + throw new UnsupportedOperationException(); + } + + + private Status sendMessageToLocalQueueActors( QakkaMessage message ) { + + int maxRetries = 5; + int retries = 0; + + while ( retries++ < maxRetries ) { + try { + Timeout t = new Timeout( 1, TimeUnit.SECONDS ); + + // ask ClientActor and wait (up to timeout) for response + + Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), message, t ); + final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() ); + + if ( response != null && response instanceof QueueAckResponse) { + QueueAckResponse qprm = (QueueAckResponse)response; + return qprm.getStatus(); + + } else if ( response != null ) { + logger.debug("ERROR RESPONSE sending message, retrying {}", retries ); + + } else { + logger.debug("TIMEOUT sending message, retrying {}", retries ); + } + + } catch ( Exception e ) { + logger.debug("ERROR sending message, retrying " + retries, e ); + } + } + + throw new QakkaRuntimeException( + "Error sending message " + message + "after " + retries ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java new file mode 100644 index 0000000..8c6adda --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.impl; + +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.cluster.singleton.ClusterSingletonManager; +import akka.cluster.singleton.ClusterSingletonManagerSettings; +import akka.cluster.singleton.ClusterSingletonProxy; +import akka.cluster.singleton.ClusterSingletonProxySettings; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.actorsystem.RouterProducer; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.distributed.actors.QueueActorRouter; +import org.apache.usergrid.persistence.qakka.distributed.messages.*; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +public class QueueActorRouterProducer implements RouterProducer { + + static Injector injector; + ActorSystemManager actorSystemManager; + QakkaFig qakkaFig; + + + @Inject + public QueueActorRouterProducer( + Injector injector, + ActorSystemManager actorSystemManager, + QakkaFig qakkaFig) { + + this.injector = injector; + this.actorSystemManager = actorSystemManager; + this.qakkaFig = qakkaFig; + } + + + @Override + public String getRouterPath() { + return "/user/queueActorRouterProxy"; + } + + + @Override + public void produceRouter(ActorSystem system, String role) { + + ClusterSingletonManagerSettings settings = + ClusterSingletonManagerSettings.create( system ).withRole( "io" ); + + system.actorOf( ClusterSingletonManager.props( + Props.create( GuiceActorProducer.class, injector, QueueActorRouter.class ), + PoisonPill.getInstance(), settings ), "queueActorRouter" ); + + ClusterSingletonProxySettings proxySettings = + ClusterSingletonProxySettings.create( system ).withRole( role ); + + system.actorOf( + ClusterSingletonProxy.props( "/user/queueActorRouter", proxySettings ), "queueActorRouterProxy" ); + } + + + @Override + public void addConfiguration(Map<String, Object> configMap) { + + int numInstancesPerNode = qakkaFig.getNumQueueActors(); + + Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" ); + final Map<String, Object> deploymentMap; + + if ( akka.get( "actor" ) == null ) { + deploymentMap = new HashMap<>(); + akka.put( "actor", new HashMap<String, Object>() {{ + put( "deployment", deploymentMap ); + }} ); + + } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) { + deploymentMap = new HashMap<>(); + ((Map) akka.get( "actor" )).put( "deployment", deploymentMap ); + + } else { + deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" ); + } + + deploymentMap.put( "/queueActorRouter/singleton/router", new HashMap<String, Object>() {{ + put( "router", "consistent-hashing-pool" ); + put( "cluster", new HashMap<String, Object>() {{ + put( "enabled", "on" ); + put( "allow-local-routees", "on" ); + put( "use-role", "io" ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "failure-detector", new HashMap<String, Object>() {{ + put( "threshold", "10" ); + put( "acceptable-heartbeat-pause", "3 s" ); + put( "heartbeat-interval", "1 s" ); + put( "heartbeat-request", new HashMap<String, Object>() {{ + put( "expected-response-after", "3 s" ); + }} ); + }} ); + }} ); + }} ); + + } + + + @Override + public Collection<Class> getMessageTypes() { + return new ArrayList() {{ + add( QueueGetRequest.class ); + add( QueueAckRequest.class ); + add( QueueInitRequest.class ); + add( QueueRefreshRequest.class ); + add( QueueTimeoutRequest.class ); + add( ShardCheckRequest.class ); + }}; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java new file mode 100644 index 0000000..885a559 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.impl; + +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.cluster.singleton.ClusterSingletonManager; +import akka.cluster.singleton.ClusterSingletonManagerSettings; +import akka.cluster.singleton.ClusterSingletonProxy; +import akka.cluster.singleton.ClusterSingletonProxySettings; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.actorsystem.RouterProducer; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.distributed.actors.QueueSenderRouter; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +public class QueueSenderRouterProducer implements RouterProducer { + + static Injector injector; + ActorSystemManager actorSystemManager; + QakkaFig qakkaFig; + + + @Inject + public QueueSenderRouterProducer( + Injector injector, + ActorSystemManager actorSystemManager, + QakkaFig qakkaFig) { + + this.injector = injector; + this.actorSystemManager = actorSystemManager; + this.qakkaFig = qakkaFig; + } + + + @Override + public String getRouterPath() { + return "/user/queueSenderRouterProxy"; + } + + + @Override + public void produceRouter(ActorSystem system, String role) { + + ClusterSingletonManagerSettings settings = + ClusterSingletonManagerSettings.create( system ).withRole( "io" ); + + system.actorOf( ClusterSingletonManager.props( + Props.create( GuiceActorProducer.class, injector, QueueSenderRouter.class ), + PoisonPill.getInstance(), settings ), "queueSenderRouter" ); + + ClusterSingletonProxySettings proxySettings = + ClusterSingletonProxySettings.create( system ).withRole( role ); + + system.actorOf( + ClusterSingletonProxy.props( "/user/queueSenderRouter", proxySettings ), "queueSenderRouterProxy" ); + } + + + @Override + public void addConfiguration(Map<String, Object> configMap) { + + int numInstancesPerNode = qakkaFig.getNumQueueSenderActors(); + + Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" ); + final Map<String, Object> deploymentMap; + + if ( akka.get( "actor" ) == null ) { + deploymentMap = new HashMap<>(); + akka.put( "actor", new HashMap<String, Object>() {{ + put( "deployment", deploymentMap ); + }} ); + + } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) { + deploymentMap = new HashMap<>(); + ((Map) akka.get( "actor" )).put( "deployment", deploymentMap ); + + } else { + deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" ); + } + + deploymentMap.put( "/queueSenderRouter/singleton/router", new HashMap<String, Object>() {{ + put( "router", "round-robin-pool" ); + put( "cluster", new HashMap<String, Object>() {{ + put( "enabled", "on" ); + put( "allow-local-routees", "on" ); + put( "use-role", "io" ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "failure-detector", new HashMap<String, Object>() {{ + put( "threshold", "10" ); + put( "acceptable-heartbeat-pause", "3 s" ); + put( "heartbeat-interval", "1 s" ); + put( "heartbeat-request", new HashMap<String, Object>() {{ + put( "expected-response-after", "3 s" ); + }} ); + }} ); + }} ); + }} ); + + } + + @Override + public Collection<Class> getMessageTypes() { + return Collections.singletonList( QueueSendRequest.class ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java new file mode 100644 index 0000000..c8b5567 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.impl; + +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.cluster.singleton.ClusterSingletonManager; +import akka.cluster.singleton.ClusterSingletonManagerSettings; +import akka.cluster.singleton.ClusterSingletonProxy; +import akka.cluster.singleton.ClusterSingletonProxySettings; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.actorsystem.RouterProducer; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +public class QueueWriterRouterProducer implements RouterProducer { + + static Injector injector; + ActorSystemManager actorSystemManager; + QakkaFig qakkaFig; + + + @Inject + public QueueWriterRouterProducer( + Injector injector, + ActorSystemManager actorSystemManager, + QakkaFig qakkaFig) { + + this.injector = injector; + this.actorSystemManager = actorSystemManager; + this.qakkaFig = qakkaFig; + } + + + @Override + public String getRouterPath() { + return "/user/queueWriterRouterProxy"; + } + + + @Override + public void produceRouter(ActorSystem system, String role) { + + ClusterSingletonManagerSettings settings = + ClusterSingletonManagerSettings.create( system ).withRole( "io" ); + + system.actorOf( ClusterSingletonManager.props( + Props.create( GuiceActorProducer.class, injector, QueueWriterRouter.class ), + PoisonPill.getInstance(), settings ), "queueWriterRouter" ); + + ClusterSingletonProxySettings proxySettings = + ClusterSingletonProxySettings.create( system ).withRole( role ); + + system.actorOf( + ClusterSingletonProxy.props( "/user/queueWriterRouter", proxySettings ), "queueWriterRouterProxy" ); + } + + + @Override + public void addConfiguration(Map<String, Object> configMap) { + + int numInstancesPerNode = qakkaFig.getNumQueueWriterActors(); + + Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" ); + final Map<String, Object> deploymentMap; + + if ( akka.get( "actor" ) == null ) { + deploymentMap = new HashMap<>(); + akka.put( "actor", new HashMap<String, Object>() {{ + put( "deployment", deploymentMap ); + }} ); + + } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) { + deploymentMap = new HashMap<>(); + ((Map) akka.get( "actor" )).put( "deployment", deploymentMap ); + + } else { + deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" ); + } + + deploymentMap.put( "/queueWriterRouter/singleton/router", new HashMap<String, Object>() {{ + put( "router", "round-robin-pool" ); + put( "cluster", new HashMap<String, Object>() {{ + put( "enabled", "on" ); + put( "allow-local-routees", "on" ); + put( "use-role", "io" ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "failure-detector", new HashMap<String, Object>() {{ + put( "threshold", "10" ); + put( "acceptable-heartbeat-pause", "3 s" ); + put( "heartbeat-interval", "1 s" ); + put( "heartbeat-request", new HashMap<String, Object>() {{ + put( "expected-response-after", "3 s" ); + }} ); + }} ); + }} ); + }} ); + + } + + @Override + public Collection<Class> getMessageTypes() { + return Collections.singletonList( QueueWriteRequest.class ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java new file mode 100644 index 0000000..a1bbf14 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import java.io.Serializable; + +/** + * Marker interface + */ +public interface QakkaMessage extends Serializable { +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java new file mode 100644 index 0000000..4beb46b --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import java.util.UUID; + + +public class QueueAckRequest implements QakkaMessage { + private final String queueName; + private final UUID queueMessageId; + + + public QueueAckRequest( String queueName, UUID queueMessageId ) { + this.queueName = queueName; + this.queueMessageId = queueMessageId; + } + + public String getQueueName() { + return queueName; + } + + public UUID getQueueMessageId() { + return queueMessageId; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .append( "queueMessageId", queueMessageId ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java new file mode 100644 index 0000000..68e6213 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; + +import java.util.UUID; + + +public class QueueAckResponse implements QakkaMessage { + private final String queueName; + private final UUID messageId; + private final DistributedQueueService.Status status; + + public QueueAckResponse( String queueName, UUID messageId, DistributedQueueService.Status status ) { + this.queueName = queueName; + this.messageId = messageId; + this.status = status; + } + + public String getQueueName() { + return queueName; + } + + public UUID getMessageId() { + return messageId; + } + + public DistributedQueueService.Status getStatus() { + return status; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .append( "messageId", messageId ) + .append( "status", status ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java new file mode 100644 index 0000000..c23dcf6 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + + +import org.apache.commons.lang3.builder.ToStringBuilder; + +public class QueueGetRequest implements QakkaMessage { + private final String queueName; + private final int numRequested; + + public QueueGetRequest(String queueName, int numRequested ) { + this.queueName = queueName; + this.numRequested = numRequested; + } + + public String getQueueName() { + return queueName; + } + + public int getNumRequested() { + return numRequested; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .append( "numRequested", numRequested ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java new file mode 100644 index 0000000..c8004fb --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; + +import java.util.Collection; +import java.util.Collections; + + +public class QueueGetResponse implements QakkaMessage { + private final Collection<DatabaseQueueMessage> queueMessages; + private final DistributedQueueService.Status status; + + + public QueueGetResponse(DistributedQueueService.Status status ) { + this.status = status; + this.queueMessages = Collections.emptyList(); + } + + public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) { + this.status = status; + this.queueMessages = queueMessages; + } + + public DistributedQueueService.Status getStatus() { + return status; + } + + public boolean isSuccess() { + return status.equals( DistributedQueueService.Status.SUCCESS); + } + + public Collection<DatabaseQueueMessage> getQueueMessages() { + return queueMessages; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "messageCount", queueMessages.size() ) + .append( "status", status ) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java new file mode 100644 index 0000000..10180cd --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + + +public class QueueInitRequest implements QakkaMessage { + private final String queueName; + + + public QueueInitRequest(String queueName ) { + this.queueName = queueName; + } + + public String getQueueName() { + return queueName; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java new file mode 100644 index 0000000..a81a6fd --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + + +public class QueueRefreshRequest implements QakkaMessage { + private final String queueName; + + + public QueueRefreshRequest(String queueName ) { + this.queueName = queueName; + } + + public String getQueueName() { + return queueName; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java new file mode 100644 index 0000000..8a655f4 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import java.util.UUID; + + +public class QueueSendRequest implements QakkaMessage { + + private final String queueName; + private final String sourceRegion; + private final String destRegion; + private final UUID messageId; + private Long deliveryTime; + private Long expirationTime; + + + public QueueSendRequest( + String queueName, String sourceRegion, String destRegion, UUID messageId, + Long deliveryTime, Long expirationTime) { + + this.queueName = queueName; + this.sourceRegion = sourceRegion; + this.destRegion = destRegion; + this.messageId = messageId; + this.deliveryTime = deliveryTime; + this.expirationTime = expirationTime; + } + + public String getQueueName() { + return queueName; + } + + public String getSourceRegion() { + return sourceRegion; + } + + public String getDestRegion() { + return destRegion; + } + + public UUID getMessageId() { + return messageId; + } + + public Long getExpirationTime() { + return expirationTime; + } + + public Long getDeliveryTime() { + return deliveryTime; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .append( "sourceRegion", sourceRegion ) + .append( "destRegion", destRegion ) + .append( "messageId", messageId ) + .append( "expirationTime", expirationTime ) + .append( "deliveryTime", deliveryTime ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java new file mode 100644 index 0000000..0c295a0 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; + + +public class QueueSendResponse implements QakkaMessage { + private final DistributedQueueService.Status status; + + public QueueSendResponse(DistributedQueueService.Status status) { + this.status = status; + } + + public DistributedQueueService.Status getSendStatus() { + return status; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "status", status ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java new file mode 100644 index 0000000..5358459 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + + +public class QueueTimeoutRequest implements QakkaMessage { + private final String queueName; + + + public QueueTimeoutRequest(String queueName ) { + this.queueName = queueName; + } + + public String getQueueName() { + return queueName; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java new file mode 100644 index 0000000..c7411de --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import java.util.UUID; + + +public class QueueWriteRequest implements QakkaMessage { + + private final String queueName; + private final String sourceRegion; + private final String destRegion; + private final UUID messageId; + private Long deliveryTime; + private Long expirationTime; + + + public QueueWriteRequest( + String queueName, String sourceRegion, String destRegion, UUID messageId, + Long deliveryTime, Long expirationTime) { + + this.queueName = queueName; + this.sourceRegion = sourceRegion; + this.destRegion = destRegion; + this.messageId = messageId; + this.deliveryTime = deliveryTime; + this.expirationTime = expirationTime; + } + + public String getQueueName() { + return queueName; + } + + public String getSourceRegion() { + return sourceRegion; + } + + public String getDestRegion() { + return destRegion; + } + + public UUID getMessageId() { + return messageId; + } + + public Long getExpirationTime() { + return expirationTime; + } + + public Long getDeliveryTime() { + return deliveryTime; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .append( "sourceRegion", sourceRegion ) + .append( "destRegion", destRegion ) + .append( "messageId", messageId ) + .append( "expirationTime", expirationTime ) + .append( "deliveryTime", deliveryTime ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java new file mode 100644 index 0000000..1eb513c --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter; + + +public class QueueWriteResponse implements QakkaMessage { + private final QueueWriter.WriteStatus status; + + public QueueWriteResponse(QueueWriter.WriteStatus status) { + this.status = status; + } + + public QueueWriter.WriteStatus getSendStatus() { + return status; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "status", status ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java new file mode 100644 index 0000000..78dbe13 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.messages; + +import org.apache.commons.lang3.builder.ToStringBuilder; + + +public class ShardCheckRequest implements QakkaMessage { + private final String queueName; + + + public ShardCheckRequest(String queueName ) { + this.queueName = queueName; + } + + public String getQueueName() { + return queueName; + } + + public String toString() { + return new ToStringBuilder( this ) + .append( "queueName", queueName ) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java new file mode 100644 index 0000000..500fb6a --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.exceptions; + + +public class BadRequestException extends QakkaRuntimeException { + + public BadRequestException(String message) { + super( message ); + } + + public BadRequestException(String message, Throwable cause) { + super( message, cause ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java new file mode 100644 index 0000000..5f76163 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.exceptions; + + +public class NotFoundException extends QakkaRuntimeException { + + public NotFoundException(String message) { + super( message ); + } + + public NotFoundException(String message, Throwable cause) { + super( message, cause ); + } +}