This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 6d4aecc452e7e3c8a55169e2cb6164c1f3562727 Author: Lei Zhang <zhang...@apache.org> AuthorDate: Tue Nov 5 23:54:57 2019 +0800 SCB-1508 Omega uses the onGetServerMeta method to get the Alpha configuration --- acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml | 3 --- .../org/apache/servicecomb/pack/alpha/server/AlphaConfig.java | 10 ++++++++-- .../pack/alpha/server/GrpcTxEventEndpointImpl.java | 11 ++++++++++- .../pack/alpha/server/fsm/GrpcSagaEventService.java | 11 ++++++++++- .../servicecomb/pack/alpha/server/AlphaIntegrationTest.java | 9 +++++++++ .../pack/alpha/server/fsm/AlphaIntegrationFsmTest.java | 8 ++++++++ .../servicecomb/pack/alpha/server/fsm/OmegaEventSender.java | 5 +++++ .../servicecomb/pack/omega/spring/OmegaSpringConfig.java | 11 ++++++----- 8 files changed, 56 insertions(+), 12 deletions(-) diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml b/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml index e9008cb..dde0a3b 100644 --- a/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml +++ b/acceptance-tests/acceptance-pack-akka-spring-demo/pom.xml @@ -185,7 +185,6 @@ -Dorg.jboss.byteman.debug=true -Dorg.jboss.byteman.verbose=true -javaagent:/maven/saga/byteman.jar=port:9092,address:0.0.0.0,listener:true </JAVA_OPTS> - <alpha.feature.akka.enabled>true</alpha.feature.akka.enabled> </env> <wait> <log>Started [a-zA-Z]+ in [0-9.]+ seconds</log> @@ -218,7 +217,6 @@ -Dorg.jboss.byteman.debug=true -Dorg.jboss.byteman.verbose=true -javaagent:/maven/saga/byteman.jar=port:9093,address:0.0.0.0,listener:true </JAVA_OPTS> - <alpha.feature.akka.enabled>true</alpha.feature.akka.enabled> </env> <wait> <log>Started [a-zA-Z]+ in [0-9.]+ seconds</log> @@ -251,7 +249,6 @@ -Dorg.jboss.byteman.debug=true -Dorg.jboss.byteman.verbose=true -javaagent:/maven/saga/byteman.jar=port:9091,address:0.0.0.0,listener:true </JAVA_OPTS> - <alpha.feature.akka.enabled>true</alpha.feature.akka.enabled> </env> <wait> <log>Started [a-zA-Z]+ in [0-9.]+ seconds</log> diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java index ea82b4d..ce9f688 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java @@ -35,6 +35,8 @@ import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService; import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner; import org.apache.servicecomb.pack.alpha.server.tcc.service.TccEventScanner; import org.apache.servicecomb.pack.alpha.server.tcc.service.TccTxEventService; +import org.apache.servicecomb.pack.common.AlphaMetaKeys; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -153,8 +155,10 @@ public class AlphaConfig { ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService, Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService, TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus) throws IOException { + ServerMeta serverMeta = ServerMeta.newBuilder() + .putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(false)).build(); ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus, - new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks), grpcTccEventService); + new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, serverMeta), grpcTccEventService); new Thread(bootstrap::start).start(); tccPendingTaskRunner.start(); tccEventScanner.start(); @@ -171,8 +175,10 @@ public class AlphaConfig { ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig, Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService, TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException { + ServerMeta serverMeta = ServerMeta.newBuilder() + .putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(true)).build(); ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus, - new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService); + new GrpcSagaEventService(actorEventChannel, omegaCallbacks, serverMeta), grpcTccEventService); new Thread(bootstrap::start).start(); tccPendingTaskRunner.start(); tccEventScanner.start(); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java index 73f4419..210dbd9 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java @@ -33,6 +33,7 @@ import org.apache.servicecomb.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; import io.grpc.stub.StreamObserver; @@ -45,11 +46,13 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { private final TxConsistentService txConsistentService; private final Map<String, Map<String, OmegaCallback>> omegaCallbacks; + private final ServerMeta serverMeta; GrpcTxEventEndpointImpl(TxConsistentService txConsistentService, - Map<String, Map<String, OmegaCallback>> omegaCallbacks) { + Map<String, Map<String, OmegaCallback>> omegaCallbacks, ServerMeta serverMeta) { this.txConsistentService = txConsistentService; this.omegaCallbacks = omegaCallbacks; + this.serverMeta = serverMeta; } @Override @@ -93,4 +96,10 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { responseObserver.onNext(ok ? ALLOW : REJECT); responseObserver.onCompleted(); } + + @Override + public void onGetServerMeta(GrpcServiceConfig request, StreamObserver<ServerMeta> responseObserver){ + responseObserver.onNext(this.serverMeta); + responseObserver.onCompleted(); + } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java index f711670..6c23ace 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java @@ -32,6 +32,7 @@ import org.apache.servicecomb.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +44,13 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { private final Map<String, Map<String, OmegaCallback>> omegaCallbacks; private final ActorEventChannel actorEventChannel; + private final ServerMeta serverMeta; public GrpcSagaEventService(ActorEventChannel actorEventChannel, - Map<String, Map<String, OmegaCallback>> omegaCallbacks) { + Map<String, Map<String, OmegaCallback>> omegaCallbacks, ServerMeta serverMeta) { this.actorEventChannel = actorEventChannel; this.omegaCallbacks = omegaCallbacks; + this.serverMeta = serverMeta; } @Override @@ -150,4 +153,10 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { responseObserver.onNext(ok ? ALLOW : REJECT); responseObserver.onCompleted(); } + + @Override + public void onGetServerMeta(GrpcServiceConfig request, StreamObserver<ServerMeta> responseObserver){ + responseObserver.onNext(this.serverMeta); + responseObserver.onCompleted(); + } } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java index fa501c3..eef73f8 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java @@ -30,6 +30,7 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import com.google.protobuf.ByteString; @@ -48,10 +49,12 @@ import javax.annotation.PostConstruct; import org.apache.servicecomb.pack.alpha.core.*; import org.apache.servicecomb.pack.common.EventType; +import org.apache.servicecomb.pack.common.AlphaMetaKeys; import org.apache.servicecomb.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub; @@ -170,6 +173,12 @@ public class AlphaIntegrationTest { } @Test + public void serverMetaTest(){ + ServerMeta serverMeta = blockingStub.onGetServerMeta(serviceConfig); + assertEquals(Boolean.parseBoolean(serverMeta.getMetaMap().get(AlphaMetaKeys.AkkaEnabled.name())),false); + } + + @Test public void persistsEvent() { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java index e76eadd..15fedc3 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java @@ -36,6 +36,8 @@ import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExt import org.apache.servicecomb.pack.alpha.server.AlphaApplication; import org.apache.servicecomb.pack.alpha.server.AlphaConfig; import org.apache.servicecomb.pack.common.EventType; +import org.apache.servicecomb.pack.common.AlphaMetaKeys; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -106,6 +108,12 @@ public class AlphaIntegrationFsmTest { } @Test + public void serverMetaTest(){ + ServerMeta serverMeta = omegaEventSender.onGetServerMeta(); + assertEquals(Boolean.parseBoolean(serverMeta.getMetaMap().get(AlphaMetaKeys.AkkaEnabled.name())),true); + } + + @Test public void successfulTest() { final String globalTxId = UUID.randomUUID().toString(); final String localTxId_1 = UUID.randomUUID().toString(); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java index fe1f965..68d8501 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java @@ -29,6 +29,7 @@ import org.apache.servicecomb.pack.alpha.core.OmegaCallback; import org.apache.servicecomb.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand; import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub; @@ -72,6 +73,10 @@ public class OmegaEventSender { blockingStub.onDisconnected(serviceConfig); } + public ServerMeta onGetServerMeta(){ + return blockingStub.onGetServerMeta(serviceConfig); + } + public void setOmegaCallbacks( Map<String, Map<String, OmegaCallback>> omegaCallbacks) { this.omegaCallbacks = omegaCallbacks; diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java index 9284323..ec8c624 100644 --- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java @@ -18,6 +18,8 @@ package org.apache.servicecomb.pack.omega.spring; import com.google.common.collect.ImmutableList; +import org.apache.servicecomb.pack.common.AlphaMetaKeys; +import org.apache.servicecomb.pack.contract.grpc.ServerMeta; import org.apache.servicecomb.pack.omega.connector.grpc.AlphaClusterDiscovery; import org.apache.servicecomb.pack.omega.connector.grpc.AlphaClusterConfig; import org.apache.servicecomb.pack.omega.connector.grpc.core.FastestSender; @@ -55,8 +57,6 @@ class OmegaSpringConfig { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - @Value("${alpha.feature.akka.enabled:false}") - private boolean alphaFeatureAkkaEnabled; @Bean(name = {"omegaUniqueIdGenerator"}) IdGenerator<String> idGenerator() { @@ -64,9 +64,10 @@ class OmegaSpringConfig { } @Bean - OmegaContext omegaContext(@Qualifier("omegaUniqueIdGenerator") IdGenerator<String> idGenerator) { - LOG.info("alpha.feature.akka.enabled={}",alphaFeatureAkkaEnabled); - return new OmegaContext(idGenerator,alphaFeatureAkkaEnabled); + OmegaContext omegaContext(@Qualifier("omegaUniqueIdGenerator") IdGenerator<String> idGenerator, SagaMessageSender messageSender) { + ServerMeta serverMeta = messageSender.onGetServerMeta(); + boolean akkaEnabeld = Boolean.parseBoolean(serverMeta.getMetaMap().get(AlphaMetaKeys.AkkaEnabled.name())); + return new OmegaContext(idGenerator,akkaEnabeld); } @Bean(name = {"compensationContext"})