This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 711eebc0634d014742bfdfcf4ef5572c1f7d5e2c
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Wed Aug 14 17:07:56 2019 +0800

    SCB-1368 Add akka cluster property adapter
---
 .../integration/akka/AkkaClusterListener.java      | 80 ++++++++++++++++++++++
 .../akka/AkkaConfigPropertyAdapter.java            | 26 +++++--
 .../src/main/resources/application.yaml            | 10 +++
 3 files changed, 111 insertions(+), 5 deletions(-)

diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java
new file mode 100644
index 0000000..418cd2e
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import akka.actor.AbstractActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberEvent;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import java.lang.invoke.MethodHandles;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AkkaClusterListener extends AbstractActor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  LoggingAdapter AKKA_LOG = Logging.getLogger(getContext().getSystem(), this);
+  Cluster cluster = Cluster.get(getContext().getSystem());
+
+  @Override
+  public Receive createReceive() {
+    return receiveBuilder()
+        .match(MemberUp.class, mUp -> {
+          LOG.info("Member is Up: {}", mUp.member());
+        })
+        .match(UnreachableMember.class, mUnreachable -> {
+          LOG.info("Member detected as unreachable: {}", 
mUnreachable.member());
+        })
+        .match(MemberRemoved.class, mRemoved -> {
+          LOG.info("Member is Removed: {}", mRemoved.member());
+        })
+        .match(MemberEvent.class, message -> {
+          // ignore
+        })
+        .matchAny(msg -> AKKA_LOG.warning("Received unknown message: {}", msg))
+        .build();
+  }
+
+  //subscribe to cluster changes
+  @Override
+  public void preStart() {
+    cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
+        MemberEvent.class, UnreachableMember.class);
+  }
+
+  //re-subscribe when restart
+  @Override
+  public void postStop() {
+    cluster.unsubscribe(getSelf());
+  }
+
+  @Override
+  public void preRestart(Throwable reason, Optional<Object> message) {
+    AKKA_LOG.error(
+        reason,
+        "Restarting due to [{}] when processing [{}]",
+        reason.getMessage(),
+        message.isPresent() ? message.get() : "");
+  }
+}
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
index c6ae195..d364da7 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
@@ -31,10 +31,15 @@ public class AkkaConfigPropertyAdapter {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String PROPERTY_SOURCE_NAME = "akkaConfig.";
+  static final String AKKA_CLUSTER_SEED_NODES_KEY = "akka.cluster.seed-nodes";
+  static final String AKKA_ESTENSIONS_KEY = "akka.extensions";
+  static final String AKKA_LOGGERS_KEY = "akka.loggers";
 
   public static Map<String, Object> getPropertyMap(ConfigurableEnvironment 
environment) {
     final Map<String, Object> propertyMap = new HashMap<>();
-
+    final List<String> seedNodes = new ArrayList<>();
+    final List<String> extensions = new ArrayList<>();
+    final List<String> loggers = new ArrayList<>();
     for (final PropertySource source : environment.getPropertySources()) {
       if (isEligiblePropertySource(source)) {
         final EnumerablePropertySource enumerable = (EnumerablePropertySource) 
source;
@@ -42,14 +47,25 @@ public class AkkaConfigPropertyAdapter {
         for (final String name : enumerable.getPropertyNames()) {
           if (name.startsWith(PROPERTY_SOURCE_NAME) && 
!propertyMap.containsKey(name)) {
             String key = name.substring(PROPERTY_SOURCE_NAME.length());
-            Object value = environment.getProperty(name);
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Adding property {}={}" + key, value);
+            String value = environment.getProperty(name);
+            if (key.startsWith(AKKA_CLUSTER_SEED_NODES_KEY)) {
+              seedNodes.add(value);
+            } else if (key.startsWith(AKKA_ESTENSIONS_KEY)) {
+              extensions.add(value);
+            } else if (key.startsWith(AKKA_LOGGERS_KEY)) {
+              loggers.add(value);
+            } else {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Adding property {}={}" + key, value);
+              }
+              propertyMap.put(key, value);
             }
-            propertyMap.put(key, value);
           }
         }
       }
+      propertyMap.put(AKKA_CLUSTER_SEED_NODES_KEY, seedNodes);
+      propertyMap.put(AKKA_ESTENSIONS_KEY, extensions);
+      propertyMap.put(AKKA_LOGGERS_KEY, loggers);
     }
 
     return Collections.unmodifiableMap(propertyMap);
diff --git a/alpha/alpha-server/src/main/resources/application.yaml 
b/alpha/alpha-server/src/main/resources/application.yaml
index ed2c41e..23ae3e2 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -52,10 +52,20 @@ eureka:
 
 
 akkaConfig:
+  # persistence
   akka.persistence.journal.plugin: akka.persistence.journal.inmem
   akka.persistence.journal.leveldb.dir: target/example/journal
   akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
   akka.persistence.snapshot-store.local.dir: target/example/snapshots
+  # cluster
+  akka.actor.provider: cluster
+  akka.remote.log-remote-lifecycle-events: info
+  akka.remote.netty.tcp.hostname: 127.0.0.1
+  akka.remote.netty.tcp.port: 8070
+  akka.cluster.seed-nodes: ["akka.tcp://alpha-akka@127.0.0.1:8070"]
+  #
+  akka.extensions: ["akka.cluster.metrics.ClusterMetricsExtension"]
+
 
 management:
   endpoints:

Reply via email to