This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 001a26d1dd6d0f7d4fa0f094bb0d52e7162bb66a Author: Lei Zhang <coolbee...@gmail.com> AuthorDate: Tue Sep 10 17:03:07 2019 +0800 SCB-1368 Refactoring model alpha-fsm-channel-kafka and alpha-fsm-channel-redis to alpha-fsm --- alpha/alpha-fsm-channel-kafka/README.md | 28 ---- alpha/alpha-fsm-channel-kafka/pom.xml | 113 ------------- .../kafka/KafkaChannelAutoConfiguration.java | 175 --------------------- .../fsm/channel/kafka/KafkaMessageListener.java | 49 ------ .../src/main/resources/META-INF/spring.factories | 17 -- .../channel/kafka/test/KafkaActorEventSink.java | 31 ---- .../fsm/channel/kafka/test/KafkaApplication.java | 40 ----- .../fsm/channel/kafka/test/KafkaChannelTest.java | 95 ----------- .../src/test/resources/log4j2.xml | 30 ---- alpha/alpha-fsm-channel-redis/README.md | 17 -- alpha/alpha-fsm-channel-redis/pom.xml | 99 ------------ .../src/main/resources/META-INF/spring.factories | 17 -- .../pack/alpha/fsm/RedisChannelTest.java | 130 --------------- .../servicecomb/pack/alpha/fsm/RedisEventSink.java | 32 ---- .../src/test/resources/log4j2.xml | 30 ---- .../pack/alpha/fsm/FsmAutoConfiguration.java | 73 ++------- .../fsm/channel/AbstractActorEventChannel.java | 3 - .../alpha/fsm/channel/AbstractEventConsumer.java | 20 +++ .../fsm/channel/ActiveMQActorEventChannel.java | 43 ----- .../{ => kafka}/KafkaActorEventChannel.java | 14 +- .../kafka/KafkaChannelAutoConfiguration.java | 149 ++++++++++++++++++ .../fsm/channel/kafka/KafkaMessagePublisher.java | 14 +- .../fsm/channel/kafka/KafkaSagaEventConsumer.java | 97 ++++++++++++ .../{ => memory}/MemoryActorEventChannel.java | 33 +--- .../alpha/fsm/channel/redis/MessageSerializer.java | 27 ++-- .../{ => redis}/RedisActorEventChannel.java | 18 +-- .../redis/RedisChannelAutoConfiguration.java | 49 ++++-- .../fsm/channel/redis/RedisMessagePublisher.java | 15 +- .../fsm/channel/redis/RedisSagaEventConsumer.java} | 50 +++--- alpha/pom.xml | 2 - 30 files changed, 391 insertions(+), 1119 deletions(-) diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md deleted file mode 100644 index 094bbfe..0000000 --- a/alpha/alpha-fsm-channel-kafka/README.md +++ /dev/null @@ -1,28 +0,0 @@ -# FSM kafka channel -## Enabled Saga State Machine Module - -Using `alpha.feature.akka.enabled=true` launch Alpha and Omega Side -Using `alpha.feature.akka.channel.type=kafka` launch Alpha and Omega Side - -```properties -alpha.feature.akka.enabled=true -alpha.feature.akka.channel.type=kafka -``` - -setting spring boot kafka -``` -spring.kafka.bootstrap-servers=kafka bootstrap_servers -spring.kafka.consumer.group-id=kafka consumer group id, default servicecomb-pack -alpha.feature.akka.channel.kafka.topic= kafka topic name, default servicecomb-pack-actor-event -spring.kafka.producer.batch-size= producer batch size, default 16384 -spring.kafka.producer.retries = producer retries, default 0 -spring.kafka.producer.buffer.memory = producer buffer memory, default 33554432 -spring.kafka.consumer.auto.offset.reset = consumer auto offset reset, default earliest -spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default false -spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100 -spring.kafka.listener.ackMode = consumer listener ack mode , default AckMode.MANUAL_IMMEDIATE -spring.kafka.listener.pollTimeout = consumer listener pool timeout, default 1500 ms - -kafka.numPartitions = kafka topic partitions, default 6 -kafka.replicationFactor = kafka topic replication, default 1 -``` diff --git a/alpha/alpha-fsm-channel-kafka/pom.xml b/alpha/alpha-fsm-channel-kafka/pom.xml deleted file mode 100644 index c7d4eff..0000000 --- a/alpha/alpha-fsm-channel-kafka/pom.xml +++ /dev/null @@ -1,113 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>alpha</artifactId> - <groupId>org.apache.servicecomb.pack</groupId> - <version>0.6.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>alpha-fsm-channel-kafka</artifactId> - <name>Pack::Alpha::Fsm::channel::kafka</name> - - <properties> - <leveldbjni-all.version>1.8</leveldbjni-all.version> - <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version> - </properties> - - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-dependencies</artifactId> - <version>${spring.boot.version}</version> - <type>pom</type> - <scope>import</scope> - </dependency> - <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-persistence_2.12</artifactId> - <version>${akka.version}</version> - </dependency> - </dependencies> - </dependencyManagement> - - <dependencies> - <!-- spring boot --> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-autoconfigure</artifactId> - </dependency> - <dependency> - <groupId>org.apache.servicecomb.pack</groupId> - <artifactId>pack-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.servicecomb.pack</groupId> - <artifactId>alpha-core</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-log4j2</artifactId> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <scope>test</scope> - </dependency> - - <!-- For testing the artifacts scope are test--> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-test</artifactId> - <exclusions> - <exclusion> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-logging</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.kafka</groupId> - <artifactId>spring-kafka</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.kafka</groupId> - <artifactId>spring-kafka-test</artifactId> - <scope>test</scope> - </dependency> -</dependencies> - -</project> diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java deleted file mode 100644 index 6729be6..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; - -import com.google.common.collect.Maps; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.*; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.Map; - -@Configuration -@ConditionalOnClass(KafkaProperties.class) -@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka") -public class KafkaChannelAutoConfiguration { - - private static final Logger logger = LoggerFactory.getLogger(KafkaChannelAutoConfiguration.class); - - @Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}") - private String topic; - - @Value("${spring.kafka.bootstrap-servers}") - private String bootstrap_servers; - - @Value("${spring.kafka.consumer.group-id:servicecomb-pack}") - private String groupId; - - @Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal") - private String trusted_packages; - - @Value("${spring.kafka.producer.batch-size:16384}") - private int batchSize; - - @Value("${spring.kafka.producer.retries:0}") - private int retries; - - @Value("${spring.kafka.producer.buffer.memory:33554432}") - private long bufferMemory; - - @Value("${spring.kafka.consumer.auto.offset.reset:earliest}") - private String autoOffsetReset; - - @Value("${spring.kafka.consumer.enable.auto.commit:false}") - private boolean enableAutoCommit; - - @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}") - private int autoCommitIntervalMs; - - @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}") - private String ackMode; - - @Value("${spring.kafka.listener.pollTimeout:1500}") - private long poolTimeout; - - @Value("${kafka.numPartitions:6}") - private int numPartitions; - - @Value("${kafka.replicationFactor:1}") - private short replicationFactor; - - @Bean - @ConditionalOnMissingBean - public ProducerFactory<String, Object> producerFactory(){ - Map<String, Object> map = Maps.newHashMap(); - map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); - map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - map.put(ProducerConfig.RETRIES_CONFIG, retries); - map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); - map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); - - return new DefaultKafkaProducerFactory<>(map); - } - - @Bean - @ConditionalOnMissingBean - public KafkaTemplate<String, Object> kafkaTemplate(){ - return new KafkaTemplate<>(producerFactory()); - } - - @Bean - @ConditionalOnMissingBean - public ConsumerFactory<String, Object> consumerFactory(){ - Map<String, Object> map = Maps.newHashMap(); - - map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); - map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); - map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); - map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); - map.put(JsonDeserializer.TRUSTED_PACKAGES, trusted_packages); - - if(logger.isDebugEnabled()){ - logger.debug("init consumerFactory properties = [{}]", map); - } - return new DefaultKafkaConsumerFactory<>(map); - } - - @Bean - @ConditionalOnMissingBean - public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory(){ - ConcurrentKafkaListenerContainerFactory<String,Object> concurrentKafkaListenerContainerFactory = - new ConcurrentKafkaListenerContainerFactory<>(); - concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); - concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(poolTimeout); - concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(ackMode)); - - return concurrentKafkaListenerContainerFactory; - } - @Bean - @ConditionalOnMissingBean - public KafkaMessagePublisher kafkaMessagePublisher(KafkaTemplate<String, Object> kafkaTemplate){ - return new KafkaMessagePublisher(topic, kafkaTemplate); - } - - @Bean - @ConditionalOnMissingBean - public KafkaMessageListener kafkaMessageListener(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink){ - return new KafkaMessageListener(actorEventSink); - } - - @Bean - @ConditionalOnMissingBean - public KafkaAdmin kafkaAdmin(){ - Map<String, Object> map = Maps.newHashMap(); - - map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); - - return new KafkaAdmin(map); - } - - @Bean - @ConditionalOnMissingBean - public NewTopic newTopic(){ - return new NewTopic(topic, numPartitions, replicationFactor); - } -} \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java deleted file mode 100644 index 8d1f880..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; - -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; - -public class KafkaMessageListener { - - private static final Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class); - - private ActorEventSink actorEventSink; - - public KafkaMessageListener(ActorEventSink actorEventSink) { - this.actorEventSink = actorEventSink; - } - - @KafkaListener(topics = "${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}") - public void listener(BaseEvent baseEvent, Acknowledgment acknowledgment){ - if(logger.isDebugEnabled()){ - logger.debug("listener event = [{}]", baseEvent); - } - - try { - actorEventSink.send(baseEvent); - acknowledgment.acknowledge(); - }catch (Exception e){ - logger.error("subscriber Exception = [{}]", e.getMessage(), e); - } - } -} \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories b/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 9366e98..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,17 +0,0 @@ -## --------------------------------------------------------------------------- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --------------------------------------------------------------------------- -org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java deleted file mode 100644 index b392a94..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test; - -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; - -import java.util.concurrent.CountDownLatch; - -public class KafkaActorEventSink implements ActorEventSink { - public static final CountDownLatch countDownLatch = new CountDownLatch(8); - - @Override - public void send(BaseEvent event) throws Exception { - countDownLatch.countDown(); - } -} diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java deleted file mode 100644 index 9b001eb..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test; - -import org.apache.servicecomb.pack.alpha.core.NodeStatus; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; - -@SpringBootApplication -public class KafkaApplication { - public static void main(String[] args) { - SpringApplication.run(KafkaApplication.class, args); - } - - @Bean(name = "actorEventSink") - public ActorEventSink actorEventSink(){ - return new KafkaActorEventSink(); - } - - @Bean(name = "nodeStatus") - public NodeStatus nodeStatus(){ - return new NodeStatus(NodeStatus.TypeEnum.MASTER); - } -} diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java deleted file mode 100644 index 942b16d..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test; - -import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.junit4.SpringRunner; - -import java.util.*; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; - - -@RunWith(SpringRunner.class) -@SpringBootTest(classes = KafkaApplication.class, - properties = { - "alpha.feature.akka.enabled=true", - "alpha.feature.akka.channel.type=kafka", - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "spring.kafka.consumer.group-id=messageListener" - } -) -@EmbeddedKafka -public class KafkaChannelTest { - @Autowired - private EmbeddedKafkaBroker embeddedKafkaBroker; - - @Autowired - private KafkaMessagePublisher kafkaMessagePublisher; - - @Autowired - private KafkaActorEventSink actorEventSink; - - @Before - public void setup(){ - } - @Test - public void testProducer(){ - - String globalTxId = UUID.randomUUID().toString().replaceAll("-", ""); - String localTxId_1 = UUID.randomUUID().toString().replaceAll("-", ""); - String localTxId_2 = UUID.randomUUID().toString().replaceAll("-", ""); - String localTxId_3 = UUID.randomUUID().toString().replaceAll("-", ""); - - buildData(globalTxId, localTxId_1, localTxId_2, localTxId_3).forEach(baseEvent -> kafkaMessagePublisher.publish(baseEvent)); - - try { - // Waiting for sub - TimeUnit.SECONDS.sleep(5); - } catch (InterruptedException e) { - } - - assertEquals(0, actorEventSink.countDownLatch.getCount()); - - } - - private List<BaseEvent> buildData(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ - List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); - return sagaEvents; - } -} diff --git a/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml b/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml deleted file mode 100644 index 8c2def9..0000000 --- a/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml +++ /dev/null @@ -1,30 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<Configuration status="WARN"> - <Appenders> - <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> - </Console> - </Appenders> - <Loggers> - <Root level="debug"> - <AppenderRef ref="Console"/> - </Root> - </Loggers> -</Configuration> diff --git a/alpha/alpha-fsm-channel-redis/README.md b/alpha/alpha-fsm-channel-redis/README.md deleted file mode 100644 index 98ecb93..0000000 --- a/alpha/alpha-fsm-channel-redis/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# FSM Redis channel -## Enabled Saga State Machine Module - -Using `alpha.feature.akka.enabled=true` launch Alpha and Omega Side -Using `alpha.feature.akka.channel.type=redis` launch Alpha and Omega Side - -```properties -alpha.feature.akka.enabled=true -alpha.feature.akka.channel.type=redis -``` - -setting spring boot redis -``` -spring.redis.host=your_redis_host -spring.redis.port=your_redis_port -spring.redis.password=your_redis_password -``` diff --git a/alpha/alpha-fsm-channel-redis/pom.xml b/alpha/alpha-fsm-channel-redis/pom.xml deleted file mode 100644 index fdab9f2..0000000 --- a/alpha/alpha-fsm-channel-redis/pom.xml +++ /dev/null @@ -1,99 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>alpha</artifactId> - <groupId>org.apache.servicecomb.pack</groupId> - <version>0.6.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>alpha-fsm-channel-redis</artifactId> - <name>Pack::Alpha::Fsm::channel::redis</name> - - <properties> - </properties> - - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-dependencies</artifactId> - <version>${spring.boot.version}</version> - <type>pom</type> - <scope>import</scope> - </dependency> - </dependencies> - </dependencyManagement> - - <dependencies> - <!-- spring boot --> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-autoconfigure</artifactId> - </dependency> - <dependency> - <groupId>org.apache.servicecomb.pack</groupId> - <artifactId>pack-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.servicecomb.pack</groupId> - <artifactId>alpha-core</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-data-redis</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-pool2</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-log4j2</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <scope>test</scope> - </dependency> - <!-- For testing the artifacts scope are test--> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-test</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - </dependencies> - -</project> diff --git a/alpha/alpha-fsm-channel-redis/src/main/resources/META-INF/spring.factories b/alpha/alpha-fsm-channel-redis/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 0810009..0000000 --- a/alpha/alpha-fsm-channel-redis/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,17 +0,0 @@ -## --------------------------------------------------------------------------- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --------------------------------------------------------------------------- -org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisChannelAutoConfiguration diff --git a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisChannelTest.java b/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisChannelTest.java deleted file mode 100644 index 858919f..0000000 --- a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisChannelTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm; - -import org.apache.servicecomb.pack.alpha.core.NodeStatus; -import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.channel.redis.MessageSerializer; -import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher; -import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessageSubscriber; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.junit.MockitoJUnitRunner; -import org.springframework.data.redis.connection.DefaultMessage; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.ChannelTopic; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; - - -@RunWith(MockitoJUnitRunner.class) -public class RedisChannelTest { - - @Mock - private RedisConnection redisConnection; - - @Mock - private RedisTemplate<String, Object> redisTemplate; - - @Mock - private RedisConnectionFactory redisConnectionFactory; - - @Spy - private ChannelTopic channelTopic = new ChannelTopic("redis-channel"); - - private RedisMessageListenerContainer redisMessageListenerContainer; - - @Spy - private NodeStatus nodeStatus = new NodeStatus(NodeStatus.TypeEnum.MASTER); - - @Spy - private RedisEventSink actorEventSink = new RedisEventSink(); - - private RedisMessagePublisher redisMessagePublisher; - - private RedisMessageSubscriber redisMessageSubscriber; - - private MessageListenerAdapter messageListenerAdapter; - - @Before - public void setup(){ - when(redisConnectionFactory.getConnection()).thenReturn(redisConnection); - - redisTemplate.afterPropertiesSet(); - - redisMessageSubscriber = new RedisMessageSubscriber(actorEventSink, nodeStatus); - messageListenerAdapter = new MessageListenerAdapter(redisMessageSubscriber); - messageListenerAdapter.afterPropertiesSet(); - - redisMessageListenerContainer = new RedisMessageListenerContainer(); - redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); - redisMessageListenerContainer.addMessageListener(messageListenerAdapter, channelTopic); - redisMessageListenerContainer.afterPropertiesSet(); - redisMessageListenerContainer.start(); - - redisMessagePublisher = new RedisMessagePublisher(redisTemplate, channelTopic); - - } - - - @Test - public void testRedisPubSub(){ - final String globalTxId = UUID.randomUUID().toString().replaceAll("-", ""); - final String localTxId1 = UUID.randomUUID().toString().replaceAll("-", ""); - final String localTxId2 = UUID.randomUUID().toString().replaceAll("-", ""); - final String localTxId3 = UUID.randomUUID().toString().replaceAll("-", ""); - - MessageSerializer messageSerializer = new MessageSerializer(); - buildData(globalTxId, localTxId1, localTxId2, localTxId3).forEach(baseEvent -> { - redisMessagePublisher.publish(baseEvent); - redisMessageSubscriber.onMessage(new DefaultMessage(channelTopic.getTopic().getBytes(), messageSerializer.serializer(baseEvent).orElse(new byte[0])), channelTopic.getTopic().getBytes()); - }); - - assertEquals(0, actorEventSink.countDownLatch.getCount()); - } - - private List<BaseEvent> buildData(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ - List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); - return sagaEvents; - } -} - - diff --git a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java b/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java deleted file mode 100644 index f44342e..0000000 --- a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.pack.alpha.fsm; - -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; - -import java.util.concurrent.CountDownLatch; - -public class RedisEventSink implements ActorEventSink { - - public static final CountDownLatch countDownLatch = new CountDownLatch(8); - - @Override - public void send(BaseEvent event) throws Exception { - countDownLatch.countDown(); - } -} diff --git a/alpha/alpha-fsm-channel-redis/src/test/resources/log4j2.xml b/alpha/alpha-fsm-channel-redis/src/test/resources/log4j2.xml deleted file mode 100644 index 58924c6..0000000 --- a/alpha/alpha-fsm-channel-redis/src/test/resources/log4j2.xml +++ /dev/null @@ -1,30 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<Configuration status="WARN"> - <Appenders> - <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> - </Console> - </Appenders> - <Loggers> - <Root level="info"> - <AppenderRef ref="Console"/> - </Root> - </Loggers> -</Configuration> diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java index 4d861f6..777ae3f 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java @@ -20,53 +20,47 @@ package org.apache.servicecomb.pack.alpha.fsm; import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER; import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Props; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.Map; import javax.annotation.PostConstruct; -import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel; -import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher; -import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher; +import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration; +import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryChannelAutoConfiguration; +import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisChannelAutoConfiguration; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository; +import org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel; import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository; import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository; -import org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel; import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel; -import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel; -import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel; -import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel; -import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; @Configuration +@ImportAutoConfiguration({ + MemoryChannelAutoConfiguration.class, + KafkaChannelAutoConfiguration.class, + RedisChannelAutoConfiguration.class}) @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"}) public class FsmAutoConfiguration { - @Value("${alpha.feature.akka.channel.memory.size:-1}") - int memoryEventChannelMemorySize; - @Value("${alpha.feature.akka.transaction.repository.elasticsearch.batchSize:1000}") int repositoryElasticsearchBatchSize; @Value("${alpha.feature.akka.transaction.repository.elasticsearch.refreshTime:5000}") int repositoryElasticsearchRefreshTime; - @Value("${alpha.feature.akka.transaction.repository.elasticsearch.memory.size:-1}") - int memoryTransactionRepositoryChannelSize; - @PostConstruct void init() { System.setProperty("es.set.netty.runtime.available.processors", "false"); @@ -77,7 +71,8 @@ public class FsmAutoConfiguration { ConfigurableEnvironment environment, MetricsService metricsService, TransactionRepositoryChannel repositoryChannel) { ActorSystem system = ActorSystem - .create("alpha-akka", akkaConfiguration(applicationContext, environment)); + .create("alpha-cluster", akkaConfiguration(applicationContext, environment)); + SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext); SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel); SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService); @@ -97,40 +92,9 @@ public class FsmAutoConfiguration { return new MetricsService(); } - @Bean - public ActorEventSink actorEventSink(MetricsService metricsService) { - return new SagaActorEventSender(metricsService); - } - - @Bean - @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory") - @ConditionalOnMissingBean(ActorEventChannel.class) - public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink, - MetricsService metricsService) { - return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize, - metricsService); - } - - @Bean - @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq") - @ConditionalOnMissingBean(ActorEventChannel.class) - public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink, - MetricsService metricsService) { - return new ActiveMQActorEventChannel(actorEventSink, metricsService); - } - - @Bean - @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka") - @ConditionalOnMissingBean(ActorEventChannel.class) - public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink, - MetricsService metricsService, @Lazy KafkaMessagePublisher kafkaMessagePublisher){ - return new KafkaActorEventChannel(actorEventSink, metricsService, kafkaMessagePublisher); - } - - @Bean - @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis") - public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, MetricsService metricsService, @Lazy RedisMessagePublisher redisMessagePublisher){ - return new RedisActorEventChannel(actorEventSink, metricsService, redisMessagePublisher); + @Bean(name = "sagaShardRegionActor") + public ActorRef sagaShardRegionActor(ActorSystem actorSystem) { + return actorSystem.actorOf(Props.create(SagaShardRegionActor.class)); } @Bean @@ -148,12 +112,9 @@ public class FsmAutoConfiguration { } @Bean - @ConditionalOnMissingBean(TransactionRepositoryChannel.class) - @ConditionalOnProperty(value = "alpha.feature.akka.transaction.repository.channel.type", havingValue = "memory", matchIfMissing = true) TransactionRepositoryChannel memoryTransactionRepositoryChannel(TransactionRepository repository, MetricsService metricsService) { - return new MemoryTransactionRepositoryChannel(repository, memoryTransactionRepositoryChannelSize, - metricsService); + return new DefaultTransactionRepositoryChannel(repository, metricsService); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java index 40f1ee7..61bbde5 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java @@ -28,14 +28,11 @@ public abstract class AbstractActorEventChannel implements ActorEventChannel { private static final Logger logger = LoggerFactory.getLogger(AbstractActorEventChannel.class); protected final MetricsService metricsService; - protected final ActorEventSink actorEventSink; public abstract void sendTo(BaseEvent event); public AbstractActorEventChannel( - ActorEventSink actorEventSink, MetricsService metricsService) { - this.actorEventSink = actorEventSink; this.metricsService = metricsService; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java new file mode 100644 index 0000000..6869add --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java @@ -0,0 +1,20 @@ +package org.apache.servicecomb.pack.alpha.fsm.channel; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; + +public abstract class AbstractEventConsumer { + + protected final MetricsService metricsService; + protected final ActorSystem actorSystem; + protected final ActorRef sagaShardRegionActor; + + public AbstractEventConsumer( + ActorSystem actorSystem, + ActorRef sagaShardRegionActor, MetricsService metricsService) { + this.metricsService = metricsService; + this.actorSystem = actorSystem; + this.sagaShardRegionActor = sagaShardRegionActor; + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java deleted file mode 100644 index 3fdc19b..0000000 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.pack.alpha.fsm.channel; - -import java.lang.invoke.MethodHandles; -import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Queue - * */ - -public class ActiveMQActorEventChannel extends AbstractActorEventChannel { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public ActiveMQActorEventChannel( - ActorEventSink actorEventSink, MetricsService metricsService) { - super(actorEventSink, metricsService); - } - - @Override - public void sendTo(BaseEvent event){ - throw new UnsupportedOperationException("Doesn't implement yet!"); - } -} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java similarity index 67% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java index aca2676..6e3cfe7 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java @@ -15,24 +15,18 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.channel; +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; -import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher; +import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KafkaActorEventChannel extends AbstractActorEventChannel { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private KafkaMessagePublisher kafkaMessagePublisher; - public KafkaActorEventChannel( - ActorEventSink actorEventSink, MetricsService metricsService, KafkaMessagePublisher kafkaMessagePublisher) { - super(actorEventSink, metricsService); + public KafkaActorEventChannel(MetricsService metricsService, KafkaMessagePublisher kafkaMessagePublisher) { + super(metricsService); this.kafkaMessagePublisher = kafkaMessagePublisher; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java new file mode 100644 index 0000000..ad44641 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.google.common.collect.Maps; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import javax.annotation.PostConstruct; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +@ConditionalOnClass(KafkaProperties.class) +@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka") +public class KafkaChannelAutoConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}") + private String topic; + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrap_servers; + + @Value("${spring.kafka.consumer.group-id:servicecomb-pack}") + private String groupId; + + @Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal") + private String trusted_packages; + + @Value("${spring.kafka.producer.batch-size:16384}") + private int batchSize; + + @Value("${spring.kafka.producer.retries:0}") + private int retries; + + @Value("${spring.kafka.producer.buffer.memory:33554432}") + private long bufferMemory; + + @Value("${spring.kafka.consumer.auto.offset.reset:earliest}") + private String autoOffsetReset; + + @Value("${spring.kafka.consumer.enable.auto.commit:false}") + private boolean enableAutoCommit; + + @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}") + private int autoCommitIntervalMs; + + @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}") + private String ackMode; + + @Value("${spring.kafka.listener.pollTimeout:1500}") + private long poolTimeout; + + @Value("${kafka.numPartitions:6}") + private int numPartitions; + + @Value("${kafka.replicationFactor:1}") + private short replicationFactor; + + @PostConstruct + public void init() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000); + try (final AdminClient adminClient = KafkaAdminClient.create(props)) { + try { + final NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); + final CreateTopicsResult createTopicsResult = adminClient + .createTopics(Collections.singleton(newTopic)); + createTopicsResult.values().get(topic).get(); + } catch (InterruptedException | ExecutionException e) { + if (!(e.getCause() instanceof TopicExistsException)) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + LOG.info("Kafka Channel Init"); + } + + @Bean + @ConditionalOnMissingBean + public KafkaMessagePublisher kafkaMessagePublisher() { + Map<String, Object> map = Maps.newHashMap(); + map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); + map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + map.put(ProducerConfig.RETRIES_CONFIG, retries); + map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + return new KafkaMessagePublisher(topic, + new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(map))); + } + + @Bean + @ConditionalOnMissingBean(ActorEventChannel.class) + public ActorEventChannel kafkaEventChannel(MetricsService metricsService, + @Lazy KafkaMessagePublisher kafkaMessagePublisher) { + return new KafkaActorEventChannel(metricsService, kafkaMessagePublisher); + } + + @Bean + KafkaSagaEventConsumer sagaEventKafkaConsumer(ActorSystem actorSystem, + @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor, + MetricsService metricsService) { + return new KafkaSagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService, + bootstrap_servers, topic); + } +} \ No newline at end of file diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java similarity index 82% rename from alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java index ba96b56..95de39b 100644 --- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java @@ -17,15 +17,14 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; +import java.util.concurrent.ExecutionException; import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; -import java.util.concurrent.ExecutionException; - -public class KafkaMessagePublisher implements MessagePublisher { +public class KafkaMessagePublisher implements MessagePublisher<BaseEvent> { private static final Logger logger = LoggerFactory.getLogger(KafkaMessagePublisher.class); @@ -38,18 +37,13 @@ public class KafkaMessagePublisher implements MessagePublisher { } @Override - public void publish(Object data) { + public void publish(BaseEvent data) { if(logger.isDebugEnabled()){ logger.debug("send message [{}] to [{}]", data, topic); } try { - if(data instanceof BaseEvent) { - BaseEvent event = (BaseEvent) data; - kafkaTemplate.send(topic, event.getGlobalTxId(), event).get(); - }else{ - throw new UnsupportedOperationException("data must be BaseEvent type"); - } + kafkaTemplate.send(topic, data.getGlobalTxId(), data).get(); } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) { logger.error("publish Exception = [{}]", e.getMessage(), e); throw new RuntimeException(e); diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java new file mode 100644 index 0000000..b816302 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.fsm.channel.kafka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.kafka.CommitterSettings; +import akka.kafka.ConsumerSettings; +import akka.kafka.Subscriptions; +import akka.kafka.javadsl.Committer; +import akka.kafka.javadsl.Consumer; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Keep; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaSagaEventConsumer extends AbstractEventConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + final String groupId = "servicecomb-pack"; + final ObjectMapper jsonMapper = new ObjectMapper(); + + public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor, + MetricsService metricsService, String bootstrap_servers, String topic) { + super(actorSystem, sagaShardRegionActor, metricsService); + + // init consumer + final Materializer materializer = ActorMaterializer.create(actorSystem); + final Config consumerConfig = actorSystem.settings().config().getConfig("akka.kafka.consumer"); + final ConsumerSettings<String, String> consumerSettings = + ConsumerSettings + .create(consumerConfig, new StringDeserializer(), new StringDeserializer()) + .withBootstrapServers(bootstrap_servers) + .withGroupId(groupId) + .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000") + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class") + .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "StringDeserializer.class"); + CommitterSettings committerSettings = CommitterSettings.create(consumerConfig); + Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) + .mapAsync(1, event -> { // must be set to 1 for ordered + return sendSagaActor(event.record().key(), event.record().value()) + .thenApply(done -> event.committableOffset()); + }) + .toMat(Committer.sink(committerSettings), Keep.both()) + .mapMaterializedValue(Consumer::createDrainingControl) + .run(materializer); + } + + private CompletionStage<String> sendSagaActor(String key, String value) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("key {}, value {}", key, value); + } + long begin = System.currentTimeMillis(); + metricsService.metrics().doActorReceived(); + sagaShardRegionActor.tell(jsonMapper.readValue(value, BaseEvent.class), sagaShardRegionActor); + long end = System.currentTimeMillis(); + metricsService.metrics().doActorAccepted(); + metricsService.metrics().doActorAvgTime(end - begin); + return CompletableFuture.completedFuture(""); + } catch (Exception ex) { + metricsService.metrics().doActorRejected(); + LOG.error("key {}, value {}", key, value); + throw new CompletionException(ex); + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java similarity index 70% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java index b56fe38..d5222b5 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.channel; +package org.apache.servicecomb.pack.alpha.fsm.channel.memory; import java.lang.invoke.MethodHandles; import java.util.concurrent.LinkedBlockingQueue; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; import org.slf4j.Logger; @@ -32,12 +33,14 @@ public class MemoryActorEventChannel extends AbstractActorEventChannel { private final LinkedBlockingQueue<BaseEvent> eventQueue; private int size; - public MemoryActorEventChannel(ActorEventSink actorEventSink, int size, - MetricsService metricsService) { - super(actorEventSink, metricsService); + public MemoryActorEventChannel(MetricsService metricsService, int size) { + super(metricsService); this.size = size > 0 ? size : Integer.MAX_VALUE; eventQueue = new LinkedBlockingQueue(this.size); - new Thread(new EventConsumer(), "MemoryActorEventChannel").start(); + } + + public LinkedBlockingQueue<BaseEvent> getEventQueue() { + return eventQueue; } @Override @@ -48,24 +51,4 @@ public class MemoryActorEventChannel extends AbstractActorEventChannel { throw new RuntimeException(e); } } - - class EventConsumer implements Runnable { - - @Override - public void run() { - while (true) { - try { - BaseEvent event = eventQueue.peek(); - if (event != null) { - actorEventSink.send(event); - eventQueue.poll(); - } else { - Thread.sleep(10); - } - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - } - } - } - } } diff --git a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java similarity index 84% rename from alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java index dc2ef16..5665eee 100644 --- a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java @@ -16,20 +16,20 @@ */ package org.apache.servicecomb.pack.alpha.fsm.channel.redis; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.serializer.RedisSerializer; -import org.springframework.data.redis.serializer.SerializationException; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.invoke.MethodHandles; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.SerializationException; public class MessageSerializer { - private static final Logger logger = LoggerFactory.getLogger(MessageSerializer.class); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static MessageSerializerImpl serializer = null; @@ -37,15 +37,16 @@ public class MessageSerializer { serializer = new MessageSerializerImpl(); } - public Optional<byte[]> serializer(Object data){ + public Optional<byte[]> serializer(Object data) { return Optional.ofNullable(serializer.serialize(data)); } - public Optional<Object> deserialize(byte[] bytes){ + public Optional<Object> deserialize(byte[] bytes) { return Optional.ofNullable(serializer.deserialize(bytes)); } - private class MessageSerializerImpl implements RedisSerializer<Object>{ + private class MessageSerializerImpl implements RedisSerializer<Object> { + @Override public byte[] serialize(Object data) throws SerializationException { try { @@ -58,8 +59,8 @@ public class MessageSerializer { outputStream.close(); return bytes; - }catch (Exception e){ - logger.error("serialize Exception = [{}]", e.getMessage(), e); + } catch (Exception e) { + LOG.error("serialize Exception = [{}]", e.getMessage(), e); } return null; @@ -76,8 +77,8 @@ public class MessageSerializer { objectInputStream.close(); return object; - }catch (Exception e){ - logger.error("deserialize Exception = [{}]", e.getMessage(), e); + } catch (Exception e) { + LOG.error("deserialize Exception = [{}]", e.getMessage(), e); } return null; diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java similarity index 75% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java index f68d7c0..20abdec 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java @@ -15,35 +15,35 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.channel; +package org.apache.servicecomb.pack.alpha.fsm.channel.redis; import java.lang.invoke.MethodHandles; -import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher; +import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pub/Sub - * */ + */ public class RedisActorEventChannel extends AbstractActorEventChannel { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private RedisMessagePublisher redisMessagePublisher; - public RedisActorEventChannel( - ActorEventSink actorEventSink, MetricsService metricsService, RedisMessagePublisher redisMessagePublisher) { - super(actorEventSink, metricsService); + public RedisActorEventChannel(MetricsService metricsService, + RedisMessagePublisher redisMessagePublisher) { + super(metricsService); this.redisMessagePublisher = redisMessagePublisher; } @Override - public void sendTo(BaseEvent event){ - if(LOG.isDebugEnabled()) { + public void sendTo(BaseEvent event) { + if (LOG.isDebugEnabled()) { LOG.debug("sendTo message = [{}]", event); } redisMessagePublisher.publish(event); diff --git a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java similarity index 69% rename from alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java index e67e8d5..3bc8cb4 100644 --- a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java @@ -16,9 +16,15 @@ */ package org.apache.servicecomb.pack.alpha.fsm.channel.redis; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import java.lang.invoke.MethodHandles; +import javax.annotation.PostConstruct; import org.apache.servicecomb.pack.alpha.core.NodeStatus; +import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel; import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher; import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; @@ -42,13 +48,20 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; @ConditionalOnClass(RedisConnection.class) @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis") public class RedisChannelAutoConfiguration { - private static final Logger logger = LoggerFactory.getLogger(RedisChannelAutoConfiguration.class); + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Value("${alpha.feature.akka.channel.redis.topic:servicecomb-pack-actor-event}") private String topic; + @PostConstruct + public void init() { + LOG.info("Redis Channel Init"); + } + @Bean - public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { + public RedisTemplate<String, Object> redisTemplate( + RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new GenericToStringSerializer<>(Object.class)); @@ -60,25 +73,29 @@ public class RedisChannelAutoConfiguration { } @Bean - RedisMessageSubscriber redisMessageSubscriber(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink, + RedisSagaEventConsumer redisSagaEventConsumer(ActorSystem actorSystem, + @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor, + MetricsService metricsService, @Lazy @Qualifier("nodeStatus") NodeStatus nodeStatus) { - return new RedisMessageSubscriber(actorEventSink, nodeStatus); + return new RedisSagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService, + nodeStatus); } @Bean - public MessageListenerAdapter messageListenerAdapter(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink, - @Lazy @Qualifier("nodeStatus") NodeStatus nodeStatus) { - return new MessageListenerAdapter(redisMessageSubscriber(actorEventSink, nodeStatus)); + public MessageListenerAdapter messageListenerAdapter( + RedisSagaEventConsumer redisSagaEventConsumer) { + return new MessageListenerAdapter(redisSagaEventConsumer); } @Bean - public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, - @Lazy @Qualifier("actorEvetSink") ActorEventSink actorEventSink, - @Lazy @Qualifier("nodeStatus") NodeStatus nodeStatus) { + public RedisMessageListenerContainer redisMessageListenerContainer( + RedisConnectionFactory redisConnectionFactory, + RedisSagaEventConsumer redisSagaEventConsumer) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); - redisMessageListenerContainer.addMessageListener(redisMessageSubscriber(actorEventSink, nodeStatus), channelTopic()); + redisMessageListenerContainer + .addMessageListener(redisSagaEventConsumer, channelTopic()); return redisMessageListenerContainer; } @@ -90,10 +107,16 @@ public class RedisChannelAutoConfiguration { @Bean ChannelTopic channelTopic() { - if (logger.isDebugEnabled()) { - logger.debug("build channel topic = [{}]", topic); + if (LOG.isDebugEnabled()) { + LOG.debug("build channel topic = [{}]", topic); } return new ChannelTopic(topic); } + @Bean + public ActorEventChannel redisEventChannel(MetricsService metricsService, + @Lazy RedisMessagePublisher redisMessagePublisher) { + return new RedisActorEventChannel(metricsService, redisMessagePublisher); + } + } diff --git a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java similarity index 75% rename from alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java index 31370e3..eca2af4 100644 --- a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java @@ -17,28 +17,31 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.redis; +import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher; +import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; -public class RedisMessagePublisher implements MessagePublisher { +public class RedisMessagePublisher implements MessagePublisher<BaseEvent> { - private static final Logger logger = LoggerFactory.getLogger(RedisMessagePublisher.class); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private RedisTemplate<String, Object> redisTemplate; private ChannelTopic channelTopic; - public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate, ChannelTopic channelTopic) { + public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate, + ChannelTopic channelTopic) { this.redisTemplate = redisTemplate; this.channelTopic = channelTopic; } @Override - public void publish(Object data) { - if(logger.isDebugEnabled()) { - logger.debug("send message [{}] to [{}]", data, channelTopic.getTopic()); + public void publish(BaseEvent data) { + if (LOG.isDebugEnabled()) { + LOG.debug("send message [{}] to [{}]", data, channelTopic.getTopic()); } redisTemplate.convertAndSend(channelTopic.getTopic(), data); diff --git a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessageSubscriber.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java similarity index 54% rename from alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessageSubscriber.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java index 0aa171b..f19e768 100644 --- a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessageSubscriber.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java @@ -14,57 +14,55 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.servicecomb.pack.alpha.fsm.channel.redis; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import java.lang.invoke.MethodHandles; import org.apache.servicecomb.pack.alpha.core.NodeStatus; import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink; +import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer; +import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel; +import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import java.nio.charset.StandardCharsets; - -public class RedisMessageSubscriber implements MessageListener { +public class RedisSagaEventConsumer extends AbstractEventConsumer implements MessageListener { - private static final Logger logger = LoggerFactory.getLogger(RedisMessageSubscriber.class); - - private ActorEventSink actorEventSink; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private NodeStatus nodeStatus; - private MessageSerializer messageSerializer = new MessageSerializer(); - public RedisMessageSubscriber(ActorEventSink actorEventSink, NodeStatus nodeStatus) { - this.actorEventSink = actorEventSink; + public RedisSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor, + MetricsService metricsService, + NodeStatus nodeStatus) { + super(actorSystem, sagaShardRegionActor, metricsService); this.nodeStatus = nodeStatus; } @Override public void onMessage(Message message, byte[] pattern) { - if(nodeStatus.isMaster()) { - if (logger.isDebugEnabled()) { - logger.debug("pattern = [{}]", new String(pattern, StandardCharsets.UTF_8)); - } - + if (nodeStatus.isMaster()) { messageSerializer.deserialize(message.getBody()).ifPresent(data -> { - BaseEvent event = (BaseEvent) data; - - if (logger.isDebugEnabled()) { - logger.debug("event = [{}]", event); + if (LOG.isDebugEnabled()) { + LOG.debug("event = [{}]", event); } - try { - actorEventSink.send(event); + long begin = System.currentTimeMillis(); + metricsService.metrics().doActorReceived(); + sagaShardRegionActor.tell(event, sagaShardRegionActor); + long end = System.currentTimeMillis(); + metricsService.metrics().doActorAccepted(); + metricsService.metrics().doActorAvgTime(end - begin); } catch (Exception e) { - logger.error("subscriber Exception = [{}]", e.getMessage(), e); + metricsService.metrics().doActorRejected(); + LOG.error("subscriber Exception = [{}]", e.getMessage(), e); } }); - }else{ - if(logger.isDebugEnabled()){ - logger.debug("nodeStatus is not master and cancel this time subscribe"); - } } } } diff --git a/alpha/pom.xml b/alpha/pom.xml index 34c43da..6fa969d 100644 --- a/alpha/pom.xml +++ b/alpha/pom.xml @@ -33,8 +33,6 @@ <modules> <module>alpha-core</module> <module>alpha-fsm</module> - <module>alpha-fsm-channel-redis</module> - <module>alpha-fsm-channel-kafka</module> <module>alpha-benchmark</module> <module>alpha-spring-cloud-starter-eureka</module> <module>alpha-spring-cloud-starter-consul</module>