This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 711eebc0634d014742bfdfcf4ef5572c1f7d5e2c Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Wed Aug 14 17:07:56 2019 +0800 SCB-1368 Add akka cluster property adapter --- .../integration/akka/AkkaClusterListener.java | 80 ++++++++++++++++++++++ .../akka/AkkaConfigPropertyAdapter.java | 26 +++++-- .../src/main/resources/application.yaml | 10 +++ 3 files changed, 111 insertions(+), 5 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java new file mode 100644 index 0000000..418cd2e --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka; + +import akka.actor.AbstractActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberEvent; +import akka.cluster.ClusterEvent.MemberRemoved; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.UnreachableMember; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import java.lang.invoke.MethodHandles; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AkkaClusterListener extends AbstractActor { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + LoggingAdapter AKKA_LOG = Logging.getLogger(getContext().getSystem(), this); + Cluster cluster = Cluster.get(getContext().getSystem()); + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(MemberUp.class, mUp -> { + LOG.info("Member is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + LOG.info("Member detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + LOG.info("Member is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, message -> { + // ignore + }) + .matchAny(msg -> AKKA_LOG.warning("Received unknown message: {}", msg)) + .build(); + } + + //subscribe to cluster changes + @Override + public void preStart() { + cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), + MemberEvent.class, UnreachableMember.class); + } + + //re-subscribe when restart + @Override + public void postStop() { + cluster.unsubscribe(getSelf()); + } + + @Override + public void preRestart(Throwable reason, Optional<Object> message) { + AKKA_LOG.error( + reason, + "Restarting due to [{}] when processing [{}]", + reason.getMessage(), + message.isPresent() ? message.get() : ""); + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java index c6ae195..d364da7 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java @@ -31,10 +31,15 @@ public class AkkaConfigPropertyAdapter { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String PROPERTY_SOURCE_NAME = "akkaConfig."; + static final String AKKA_CLUSTER_SEED_NODES_KEY = "akka.cluster.seed-nodes"; + static final String AKKA_ESTENSIONS_KEY = "akka.extensions"; + static final String AKKA_LOGGERS_KEY = "akka.loggers"; public static Map<String, Object> getPropertyMap(ConfigurableEnvironment environment) { final Map<String, Object> propertyMap = new HashMap<>(); - + final List<String> seedNodes = new ArrayList<>(); + final List<String> extensions = new ArrayList<>(); + final List<String> loggers = new ArrayList<>(); for (final PropertySource source : environment.getPropertySources()) { if (isEligiblePropertySource(source)) { final EnumerablePropertySource enumerable = (EnumerablePropertySource) source; @@ -42,14 +47,25 @@ public class AkkaConfigPropertyAdapter { for (final String name : enumerable.getPropertyNames()) { if (name.startsWith(PROPERTY_SOURCE_NAME) && !propertyMap.containsKey(name)) { String key = name.substring(PROPERTY_SOURCE_NAME.length()); - Object value = environment.getProperty(name); - if (LOG.isTraceEnabled()) { - LOG.trace("Adding property {}={}" + key, value); + String value = environment.getProperty(name); + if (key.startsWith(AKKA_CLUSTER_SEED_NODES_KEY)) { + seedNodes.add(value); + } else if (key.startsWith(AKKA_ESTENSIONS_KEY)) { + extensions.add(value); + } else if (key.startsWith(AKKA_LOGGERS_KEY)) { + loggers.add(value); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding property {}={}" + key, value); + } + propertyMap.put(key, value); } - propertyMap.put(key, value); } } } + propertyMap.put(AKKA_CLUSTER_SEED_NODES_KEY, seedNodes); + propertyMap.put(AKKA_ESTENSIONS_KEY, extensions); + propertyMap.put(AKKA_LOGGERS_KEY, loggers); } return Collections.unmodifiableMap(propertyMap); diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml index ed2c41e..23ae3e2 100644 --- a/alpha/alpha-server/src/main/resources/application.yaml +++ b/alpha/alpha-server/src/main/resources/application.yaml @@ -52,10 +52,20 @@ eureka: akkaConfig: + # persistence akka.persistence.journal.plugin: akka.persistence.journal.inmem akka.persistence.journal.leveldb.dir: target/example/journal akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local akka.persistence.snapshot-store.local.dir: target/example/snapshots + # cluster + akka.actor.provider: cluster + akka.remote.log-remote-lifecycle-events: info + akka.remote.netty.tcp.hostname: 127.0.0.1 + akka.remote.netty.tcp.port: 8070 + akka.cluster.seed-nodes: ["akka.tcp://alpha-akka@127.0.0.1:8070"] + # + akka.extensions: ["akka.cluster.metrics.ClusterMetricsExtension"] + management: endpoints: