Changes to get Qakka using same injector as rest of Usergrid
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/18e4305b Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/18e4305b Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/18e4305b Branch: refs/heads/usergrid-1318-queue Commit: 18e4305b995be88947b72172dd22056702659a8e Parents: 832b505 Author: Dave Johnson <snoopd...@apache.org> Authored: Fri Sep 16 18:33:10 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Fri Sep 16 18:33:10 2016 -0400 ---------------------------------------------------------------------- .../corepersistence/CpEntityManagerFactory.java | 3 ++ .../usergrid/persistence/core/CassandraFig.java | 2 +- .../index/impl/EsIndexProducerImpl.java | 2 -- .../apache/usergrid/persistence/qakka/App.java | 12 +++---- .../usergrid/persistence/qakka/QakkaModule.java | 6 ++-- .../impl/DistributedQueueServiceImpl.java | 35 ++++++++++++++------ 6 files changed, 38 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 5d8c417..4bec92d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -60,6 +60,7 @@ import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer; @@ -151,6 +152,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application try { logger.info("Akka cluster starting..."); + // TODO: fix this kludge + injector.getInstance( App.class ); this.actorSystemManager = injector.getInstance( ActorSystemManager.class ); actorSystemManager.registerRouterProducer( injector.getInstance( UniqueValuesService.class ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java index b599a20..bc8d087 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java @@ -91,7 +91,7 @@ public interface CassandraFig extends GuicyFig { @Default( "Usergrid_Applications" ) String getApplicationKeyspace(); - @Key( "cassandra.keyspace.application_local" ) + @Key( "cassandra.keyspace.application.local" ) @Default( "Usergrid_Applications_Local" ) String getApplicationLocalKeyspace(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java index 10d5e4a..8f58ef7 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java @@ -57,7 +57,6 @@ public class EsIndexProducerImpl implements IndexProducer { private final IndexFig config; private final FailureMonitorImpl failureMonitor; private final Client client; - private final Timer flushTimer; private final IndexFig indexFig; private final Counter indexSizeCounter; private final Histogram roundtripTimer; @@ -70,7 +69,6 @@ public class EsIndexProducerImpl implements IndexProducer { @Inject public EsIndexProducerImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory, final IndexFig indexFig) { - this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush"); this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size"); this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java index 9d9c972..41bc6fa 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java @@ -59,12 +59,12 @@ public class App implements MetricsService { this.actorSystemFig = actorSystemFig; this.actorSystemManager = actorSystemManager; this.distributedQueueService = distributedQueueService; - - try { - migrationManager.migrate(); - } catch (MigrationException e) { - throw new QakkaRuntimeException( "Error running migration", e ); - } +// +// try { +// migrationManager.migrate(); +// } catch (MigrationException e) { +// throw new QakkaRuntimeException( "Error running migration", e ); +// } } /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java index 0c37e82..d1d8d7e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java @@ -64,8 +64,8 @@ public class QakkaModule extends AbstractModule { // TODO: reconcile with usergrid props // load properties from one properties file using Netflix Archaius so that GuicyFig will see them ConfigurationManager.loadCascadedPropertiesFromResources( "qakka" ); - } catch (IOException e) { - logger.warn("Unable to load qakka.properties"); + } catch (Throwable t) { + logger.warn("Unable to load qakka.properties (can be ignored in Usergrid)"); } } @@ -105,11 +105,11 @@ public class QakkaModule extends AbstractModule { Multibinder<Migration> migrationBinder = Multibinder.newSetBinder( binder(), Migration.class ); migrationBinder.addBinding().to( Key.get( AuditLogSerialization.class ) ); - //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) ); migrationBinder.addBinding().to( Key.get( QueueMessageSerialization.class ) ); migrationBinder.addBinding().to( Key.get( QueueSerialization.class ) ); migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) ); migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) ); migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) ); + //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 1243c23..ec667e6 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -25,6 +25,7 @@ import akka.util.Timeout; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.actorsystem.ClientActor; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.QueueManager; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; @@ -185,23 +186,37 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { // ask ClientActor and wait (up to timeout) for response Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t ); - final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() ); + Object responseObject = Await.result( fut, t.duration() ); + + if ( responseObject instanceof QakkaMessage ) { - if ( response != null && response instanceof QueueGetResponse) { - QueueGetResponse qprm = (QueueGetResponse)response; - if ( qprm.isSuccess() ) { - if (retries > 1) { - logger.debug( "getNextMessage SUCCESS after {} retries", retries ); + final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() ); + + if ( response != null && response instanceof QueueGetResponse) { + QueueGetResponse qprm = (QueueGetResponse)response; + if ( qprm.isSuccess() ) { + if (retries > 1) { + logger.debug( "getNextMessage SUCCESS after {} retries", retries ); + } } + return qprm.getQueueMessages(); + + + } else if ( response != null ) { + logger.debug("ERROR RESPONSE (1) popping queue, retrying {}", retries ); + + } else { + logger.debug("TIMEOUT popping to queue, retrying {}", retries ); } - return qprm.getQueueMessages(); + } else if ( responseObject instanceof ClientActor.ErrorResponse ) { - } else if ( response != null ) { - logger.debug("ERROR RESPONSE popping queue, retrying {}", retries ); + final ClientActor.ErrorResponse errorResponse = (ClientActor.ErrorResponse)responseObject; + logger.debug("ACTORSYSTEM ERROR popping queue: {}, retrying {}", + errorResponse.getMessage(), retries ); } else { - logger.debug("TIMEOUT popping to queue, retrying {}", retries ); + logger.debug("UNKNOWN RESPONSE popping queue, retrying {}", retries ); } } catch ( Exception e ) {