Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue 03eb31246 -> a0b0f717b
Update bootstrap/init logic so avoid race conditions in writing a new queue to the database. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a0b0f717 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a0b0f717 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a0b0f717 Branch: refs/heads/usergrid-1318-queue Commit: a0b0f717bb1a23e4879282726045eba12514850d Parents: 03eb312 Author: Michael Russo <mru...@apigee.com> Authored: Sat Sep 17 11:50:34 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Sat Sep 17 11:50:34 2016 -0700 ---------------------------------------------------------------------- .../usergrid/persistence/index/IndexAlias.java | 4 ++-- .../qakka/core/impl/QueueManagerImpl.java | 7 ++++--- .../qakka/distributed/actors/QueueSender.java | 2 +- .../impl/DistributedQueueServiceImpl.java | 21 ++++++++++++++++++-- 4 files changed, 26 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java index 9952aa8..e296895 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java @@ -17,13 +17,13 @@ package org.apache.usergrid.persistence.index; -import org.apache.usergrid.persistence.index.IndexFig; +import java.io.Serializable; /** * Abstraction for Index alias names */ -public interface IndexAlias{ +public interface IndexAlias extends Serializable{ http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java index bbb46a8..789edd4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java @@ -57,9 +57,7 @@ public class QueueManagerImpl implements QueueManager { } @Override - public void createQueue(Queue queue) { - - queueSerialization.writeQueue(queue.toDatabaseQueue()); + public void createQueue(Queue queue) { List<String> regions = new ArrayList<>(); @@ -86,6 +84,9 @@ public class QueueManagerImpl implements QueueManager { shardSerialization.createShard( inflight ); } + // only write the existence of a queue to the database if its dependent initial shards have been written + queueSerialization.writeQueue(queue.toDatabaseQueue()); + distributedQueueService.initQueue( queue.getName() ); distributedQueueService.refreshQueue( queue.getName() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java index 8bd733b..03d1216 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -167,7 +167,7 @@ public class QueueSender extends UntypedActor { } } - throw new QakkaRuntimeException( "Error adding to queue after " + retries ); + throw new QakkaRuntimeException( "Error adding to queue after " + retries + " retries" ); } finally { timer.stop(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index ec667e6..4737347 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.qakka.distributed.impl; import akka.actor.ActorRef; import akka.pattern.Patterns; import akka.util.Timeout; +import com.datastax.driver.core.exceptions.InvalidQueryException; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; @@ -67,14 +68,29 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public void init() { - for ( String queueName : queueManager.getListOfQueues() ) { - initQueue( queueName ); + + try { + List<String> queues = queueManager.getListOfQueues(); + for ( String queueName : queues ) { + initQueue( queueName ); + } + }catch (InvalidQueryException e){ + + if (e.getMessage().contains("unconfigured columnfamily")){ + logger.info("Unable to initialize queues since system is bootstrapping. " + + "Queues will be initialized when created"); + }else{ + throw e; + } + } + } @Override public void initQueue(String queueName) { + logger.info("Initializing queue: {}", queueName); QueueInitRequest request = new QueueInitRequest( queueName ); ActorRef clientActor = actorSystemManager.getClientActor(); clientActor.tell( request, null ); @@ -91,6 +107,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public void refreshQueue(String queueName) { + logger.info("Refreshing queue: {}", queueName); QueueRefreshRequest request = new QueueRefreshRequest( queueName ); ActorRef clientActor = actorSystemManager.getClientActor(); clientActor.tell( request, null );