This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 001a26d1dd6d0f7d4fa0f094bb0d52e7162bb66a
Author: Lei Zhang <coolbee...@gmail.com>
AuthorDate: Tue Sep 10 17:03:07 2019 +0800

    SCB-1368 Refactoring model alpha-fsm-channel-kafka and 
alpha-fsm-channel-redis to alpha-fsm
---
 alpha/alpha-fsm-channel-kafka/README.md            |  28 ----
 alpha/alpha-fsm-channel-kafka/pom.xml              | 113 -------------
 .../kafka/KafkaChannelAutoConfiguration.java       | 175 ---------------------
 .../fsm/channel/kafka/KafkaMessageListener.java    |  49 ------
 .../src/main/resources/META-INF/spring.factories   |  17 --
 .../channel/kafka/test/KafkaActorEventSink.java    |  31 ----
 .../fsm/channel/kafka/test/KafkaApplication.java   |  40 -----
 .../fsm/channel/kafka/test/KafkaChannelTest.java   |  95 -----------
 .../src/test/resources/log4j2.xml                  |  30 ----
 alpha/alpha-fsm-channel-redis/README.md            |  17 --
 alpha/alpha-fsm-channel-redis/pom.xml              |  99 ------------
 .../src/main/resources/META-INF/spring.factories   |  17 --
 .../pack/alpha/fsm/RedisChannelTest.java           | 130 ---------------
 .../servicecomb/pack/alpha/fsm/RedisEventSink.java |  32 ----
 .../src/test/resources/log4j2.xml                  |  30 ----
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |  73 ++-------
 .../fsm/channel/AbstractActorEventChannel.java     |   3 -
 .../alpha/fsm/channel/AbstractEventConsumer.java   |  20 +++
 .../fsm/channel/ActiveMQActorEventChannel.java     |  43 -----
 .../{ => kafka}/KafkaActorEventChannel.java        |  14 +-
 .../kafka/KafkaChannelAutoConfiguration.java       | 149 ++++++++++++++++++
 .../fsm/channel/kafka/KafkaMessagePublisher.java   |  14 +-
 .../fsm/channel/kafka/KafkaSagaEventConsumer.java  |  97 ++++++++++++
 .../{ => memory}/MemoryActorEventChannel.java      |  33 +---
 .../alpha/fsm/channel/redis/MessageSerializer.java |  27 ++--
 .../{ => redis}/RedisActorEventChannel.java        |  18 +--
 .../redis/RedisChannelAutoConfiguration.java       |  49 ++++--
 .../fsm/channel/redis/RedisMessagePublisher.java   |  15 +-
 .../fsm/channel/redis/RedisSagaEventConsumer.java} |  50 +++---
 alpha/pom.xml                                      |   2 -
 30 files changed, 391 insertions(+), 1119 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md 
b/alpha/alpha-fsm-channel-kafka/README.md
deleted file mode 100644
index 094bbfe..0000000
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ /dev/null
@@ -1,28 +0,0 @@
-# FSM kafka channel
-## Enabled Saga State Machine Module
-
-Using `alpha.feature.akka.enabled=true` launch Alpha and Omega Side 
-Using `alpha.feature.akka.channel.type=kafka` launch Alpha and Omega Side 
-
-```properties
-alpha.feature.akka.enabled=true
-alpha.feature.akka.channel.type=kafka
-```
-
-setting spring boot kafka
-```
-spring.kafka.bootstrap-servers=kafka bootstrap_servers 
-spring.kafka.consumer.group-id=kafka consumer group id, default 
servicecomb-pack
-alpha.feature.akka.channel.kafka.topic= kafka topic name, default 
servicecomb-pack-actor-event
-spring.kafka.producer.batch-size= producer batch size, default 16384
-spring.kafka.producer.retries = producer retries, default 0
-spring.kafka.producer.buffer.memory = producer buffer memory, default 33554432
-spring.kafka.consumer.auto.offset.reset = consumer auto offset reset, default 
earliest
-spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, 
default false
-spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval 
ms, default 100
-spring.kafka.listener.ackMode = consumer listener ack mode , default 
AckMode.MANUAL_IMMEDIATE
-spring.kafka.listener.pollTimeout = consumer listener pool timeout, default 
1500 ms
-
-kafka.numPartitions = kafka topic partitions, default 6
-kafka.replicationFactor = kafka topic replication, default 1
-```
diff --git a/alpha/alpha-fsm-channel-kafka/pom.xml 
b/alpha/alpha-fsm-channel-kafka/pom.xml
deleted file mode 100644
index c7d4eff..0000000
--- a/alpha/alpha-fsm-channel-kafka/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <parent>
-    <artifactId>alpha</artifactId>
-    <groupId>org.apache.servicecomb.pack</groupId>
-    <version>0.6.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>alpha-fsm-channel-kafka</artifactId>
-  <name>Pack::Alpha::Fsm::channel::kafka</name>
-
-  <properties>
-    <leveldbjni-all.version>1.8</leveldbjni-all.version>
-    <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version>
-  </properties>
-
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.springframework.boot</groupId>
-        <artifactId>spring-boot-dependencies</artifactId>
-        <version>${spring.boot.version}</version>
-        <type>pom</type>
-        <scope>import</scope>
-      </dependency>
-      <dependency>
-        <groupId>com.typesafe.akka</groupId>
-        <artifactId>akka-persistence_2.12</artifactId>
-        <version>${akka.version}</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
-  <dependencies>
-    <!-- spring boot -->
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-autoconfigure</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.servicecomb.pack</groupId>
-      <artifactId>pack-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.servicecomb.pack</groupId>
-      <artifactId>alpha-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-log4j2</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- For testing the artifacts scope are test-->
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-test</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.springframework.boot</groupId>
-          <artifactId>spring-boot-starter-logging</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-</dependencies>
-
-</project>
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
 
b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
deleted file mode 100644
index 6729be6..0000000
--- 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
-
-import com.google.common.collect.Maps;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import 
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Lazy;
-import 
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.config.KafkaListenerContainerFactory;
-import org.springframework.kafka.core.*;
-import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
-import org.springframework.kafka.listener.ContainerProperties;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.Map;
-
-@Configuration
-@ConditionalOnClass(KafkaProperties.class)
-@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue 
= "kafka")
-public class KafkaChannelAutoConfiguration {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaChannelAutoConfiguration.class);
-
-    
@Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
-    private String topic;
-
-    @Value("${spring.kafka.bootstrap-servers}")
-    private String bootstrap_servers;
-
-    @Value("${spring.kafka.consumer.group-id:servicecomb-pack}")
-    private String groupId;
-
-    
@Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal")
-    private String trusted_packages;
-
-    @Value("${spring.kafka.producer.batch-size:16384}")
-    private int batchSize;
-
-    @Value("${spring.kafka.producer.retries:0}")
-    private int retries;
-
-    @Value("${spring.kafka.producer.buffer.memory:33554432}")
-    private long bufferMemory;
-
-    @Value("${spring.kafka.consumer.auto.offset.reset:earliest}")
-    private String autoOffsetReset;
-
-    @Value("${spring.kafka.consumer.enable.auto.commit:false}")
-    private boolean enableAutoCommit;
-
-    @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")
-    private int autoCommitIntervalMs;
-
-    @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}")
-    private String ackMode;
-
-    @Value("${spring.kafka.listener.pollTimeout:1500}")
-    private long poolTimeout;
-
-    @Value("${kafka.numPartitions:6}")
-    private int  numPartitions;
-
-    @Value("${kafka.replicationFactor:1}")
-    private short replicationFactor;
-
-    @Bean
-    @ConditionalOnMissingBean
-    public ProducerFactory<String, Object> producerFactory(){
-        Map<String, Object> map = Maps.newHashMap();
-        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
-        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
JsonSerializer.class);
-        map.put(ProducerConfig.RETRIES_CONFIG, retries);
-        map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
-        map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
-
-        return new DefaultKafkaProducerFactory<>(map);
-    }
-
-    @Bean
-    @ConditionalOnMissingBean
-    public KafkaTemplate<String, Object> kafkaTemplate(){
-        return new KafkaTemplate<>(producerFactory());
-    }
-
-    @Bean
-    @ConditionalOnMissingBean
-    public ConsumerFactory<String, Object> consumerFactory(){
-        Map<String, Object> map = Maps.newHashMap();
-
-        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
-        map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
JsonDeserializer.class);
-        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
-        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
-        map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
autoCommitIntervalMs);
-        map.put(JsonDeserializer.TRUSTED_PACKAGES, trusted_packages);
-
-        if(logger.isDebugEnabled()){
-            logger.debug("init consumerFactory properties = [{}]", map);
-        }
-        return new DefaultKafkaConsumerFactory<>(map);
-    }
-
-    @Bean
-    @ConditionalOnMissingBean
-    public 
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, 
Object>> kafkaListenerContainerFactory(){
-        ConcurrentKafkaListenerContainerFactory<String,Object> 
concurrentKafkaListenerContainerFactory =
-                new ConcurrentKafkaListenerContainerFactory<>();
-        
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
-        
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(poolTimeout);
-        
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(ackMode));
-
-        return concurrentKafkaListenerContainerFactory;
-    }
-    @Bean
-    @ConditionalOnMissingBean
-    public KafkaMessagePublisher kafkaMessagePublisher(KafkaTemplate<String, 
Object> kafkaTemplate){
-        return new KafkaMessagePublisher(topic, kafkaTemplate);
-    }
-
-    @Bean
-    @ConditionalOnMissingBean
-    public KafkaMessageListener kafkaMessageListener(@Lazy 
@Qualifier("actorEventSink") ActorEventSink actorEventSink){
-        return new KafkaMessageListener(actorEventSink);
-    }
-
-    @Bean
-    @ConditionalOnMissingBean
-    public KafkaAdmin kafkaAdmin(){
-        Map<String, Object> map = Maps.newHashMap();
-
-        map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
-
-        return new KafkaAdmin(map);
-    }
-
-    @Bean
-    @ConditionalOnMissingBean
-    public NewTopic newTopic(){
-        return new NewTopic(topic, numPartitions, replicationFactor);
-    }
-}
\ No newline at end of file
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
 
b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
deleted file mode 100644
index 8d1f880..0000000
--- 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
-
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.Acknowledgment;
-
-public class KafkaMessageListener {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaMessageListener.class);
-
-    private ActorEventSink actorEventSink;
-
-    public KafkaMessageListener(ActorEventSink actorEventSink) {
-        this.actorEventSink = actorEventSink;
-    }
-
-    @KafkaListener(topics = 
"${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
-    public void listener(BaseEvent baseEvent, Acknowledgment acknowledgment){
-        if(logger.isDebugEnabled()){
-            logger.debug("listener event = [{}]", baseEvent);
-        }
-
-        try {
-            actorEventSink.send(baseEvent);
-            acknowledgment.acknowledge();
-        }catch (Exception e){
-            logger.error("subscriber Exception = [{}]", e.getMessage(), e);
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories 
b/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 9366e98..0000000
--- a/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,17 +0,0 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-##
-##      http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
 
b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
deleted file mode 100644
index b392a94..0000000
--- 
a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
-
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-
-import java.util.concurrent.CountDownLatch;
-
-public class KafkaActorEventSink implements ActorEventSink {
-    public static final CountDownLatch countDownLatch = new CountDownLatch(8);
-
-    @Override
-    public void send(BaseEvent event) throws Exception {
-        countDownLatch.countDown();
-    }
-}
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
 
b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
deleted file mode 100644
index 9b001eb..0000000
--- 
a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
-
-import org.apache.servicecomb.pack.alpha.core.NodeStatus;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-@SpringBootApplication
-public class KafkaApplication {
-    public static void main(String[] args) {
-        SpringApplication.run(KafkaApplication.class, args);
-    }
-
-    @Bean(name = "actorEventSink")
-    public ActorEventSink actorEventSink(){
-        return new KafkaActorEventSink();
-    }
-
-    @Bean(name = "nodeStatus")
-    public NodeStatus nodeStatus(){
-        return new NodeStatus(NodeStatus.TypeEnum.MASTER);
-    }
-}
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
 
b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
deleted file mode 100644
index 942b16d..0000000
--- 
a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
-
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.kafka.test.EmbeddedKafkaBroker;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes = KafkaApplication.class,
-        properties = {
-                "alpha.feature.akka.enabled=true",
-                "alpha.feature.akka.channel.type=kafka",
-                
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                "spring.kafka.consumer.group-id=messageListener"
-        }
-)
-@EmbeddedKafka
-public class KafkaChannelTest {
-    @Autowired
-    private EmbeddedKafkaBroker embeddedKafkaBroker;
-
-    @Autowired
-    private KafkaMessagePublisher kafkaMessagePublisher;
-
-    @Autowired
-    private KafkaActorEventSink actorEventSink;
-
-    @Before
-    public void setup(){
-    }
-    @Test
-    public void testProducer(){
-
-        String globalTxId = UUID.randomUUID().toString().replaceAll("-", "");
-        String localTxId_1 = UUID.randomUUID().toString().replaceAll("-", "");
-        String localTxId_2 = UUID.randomUUID().toString().replaceAll("-", "");
-        String localTxId_3 = UUID.randomUUID().toString().replaceAll("-", "");
-
-        buildData(globalTxId, localTxId_1, localTxId_2, 
localTxId_3).forEach(baseEvent -> kafkaMessagePublisher.publish(baseEvent));
-
-        try {
-            // Waiting for sub
-            TimeUnit.SECONDS.sleep(5);
-        } catch (InterruptedException e) {
-        }
-
-        assertEquals(0, actorEventSink.countDownLatch.getCount());
-
-    }
-
-    private List<BaseEvent> buildData(String globalTxId, String localTxId_1, 
String localTxId_2, String localTxId_3){
-        List<BaseEvent> sagaEvents = new ArrayList<>();
-        
sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
-        
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-        
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-        
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-        
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-        
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-        
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-        
sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
-        return sagaEvents;
-    }
-}
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml 
b/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml
deleted file mode 100644
index 8c2def9..0000000
--- a/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<Configuration status="WARN">
-  <Appenders>
-    <Console name="Console" target="SYSTEM_OUT">
-      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
-    </Console>
-  </Appenders>
-  <Loggers>
-    <Root level="debug">
-      <AppenderRef ref="Console"/>
-    </Root>
-  </Loggers>
-</Configuration>
diff --git a/alpha/alpha-fsm-channel-redis/README.md 
b/alpha/alpha-fsm-channel-redis/README.md
deleted file mode 100644
index 98ecb93..0000000
--- a/alpha/alpha-fsm-channel-redis/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-# FSM Redis channel
-## Enabled Saga State Machine Module
-
-Using `alpha.feature.akka.enabled=true` launch Alpha and Omega Side 
-Using `alpha.feature.akka.channel.type=redis` launch Alpha and Omega Side 
-
-```properties
-alpha.feature.akka.enabled=true
-alpha.feature.akka.channel.type=redis
-```
-
-setting spring boot redis
-```
-spring.redis.host=your_redis_host
-spring.redis.port=your_redis_port
-spring.redis.password=your_redis_password
-```
diff --git a/alpha/alpha-fsm-channel-redis/pom.xml 
b/alpha/alpha-fsm-channel-redis/pom.xml
deleted file mode 100644
index fdab9f2..0000000
--- a/alpha/alpha-fsm-channel-redis/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <parent>
-    <artifactId>alpha</artifactId>
-    <groupId>org.apache.servicecomb.pack</groupId>
-    <version>0.6.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>alpha-fsm-channel-redis</artifactId>
-  <name>Pack::Alpha::Fsm::channel::redis</name>
-
-  <properties>
-  </properties>
-
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.springframework.boot</groupId>
-        <artifactId>spring-boot-dependencies</artifactId>
-        <version>${spring.boot.version}</version>
-        <type>pom</type>
-        <scope>import</scope>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
-  <dependencies>
-    <!-- spring boot -->
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-autoconfigure</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.servicecomb.pack</groupId>
-      <artifactId>pack-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.servicecomb.pack</groupId>
-      <artifactId>alpha-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-data-redis</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-pool2</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-log4j2</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <!-- For testing the artifacts scope are test-->
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-test</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-    </dependency>
-  </dependencies>
-
-</project>
diff --git 
a/alpha/alpha-fsm-channel-redis/src/main/resources/META-INF/spring.factories 
b/alpha/alpha-fsm-channel-redis/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 0810009..0000000
--- a/alpha/alpha-fsm-channel-redis/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,17 +0,0 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-##
-##      http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisChannelAutoConfiguration
diff --git 
a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisChannelTest.java
 
b/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisChannelTest.java
deleted file mode 100644
index 858919f..0000000
--- 
a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisChannelTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm;
-
-import org.apache.servicecomb.pack.alpha.core.NodeStatus;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.redis.MessageSerializer;
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher;
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessageSubscriber;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Spy;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.springframework.data.redis.connection.DefaultMessage;
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.listener.ChannelTopic;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
-
-@RunWith(MockitoJUnitRunner.class)
-public class RedisChannelTest {
-
-  @Mock
-  private RedisConnection redisConnection;
-
-  @Mock
-  private RedisTemplate<String, Object> redisTemplate;
-
-  @Mock
-  private RedisConnectionFactory redisConnectionFactory;
-
-  @Spy
-  private ChannelTopic channelTopic = new ChannelTopic("redis-channel");
-
-  private RedisMessageListenerContainer redisMessageListenerContainer;
-
-  @Spy
-  private NodeStatus nodeStatus = new NodeStatus(NodeStatus.TypeEnum.MASTER);
-
-  @Spy
-  private RedisEventSink actorEventSink = new RedisEventSink();
-
-  private RedisMessagePublisher redisMessagePublisher;
-
-  private RedisMessageSubscriber redisMessageSubscriber;
-
-  private MessageListenerAdapter messageListenerAdapter;
-
-  @Before
-  public void setup(){
-    when(redisConnectionFactory.getConnection()).thenReturn(redisConnection);
-
-    redisTemplate.afterPropertiesSet();
-
-    redisMessageSubscriber = new RedisMessageSubscriber(actorEventSink, 
nodeStatus);
-    messageListenerAdapter = new 
MessageListenerAdapter(redisMessageSubscriber);
-    messageListenerAdapter.afterPropertiesSet();
-
-    redisMessageListenerContainer = new RedisMessageListenerContainer();
-    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
-    redisMessageListenerContainer.addMessageListener(messageListenerAdapter, 
channelTopic);
-    redisMessageListenerContainer.afterPropertiesSet();
-    redisMessageListenerContainer.start();
-
-    redisMessagePublisher = new RedisMessagePublisher(redisTemplate, 
channelTopic);
-
-  }
-
-
-  @Test
-  public void testRedisPubSub(){
-    final String globalTxId = UUID.randomUUID().toString().replaceAll("-", "");
-    final String localTxId1 = UUID.randomUUID().toString().replaceAll("-", "");
-    final String localTxId2 = UUID.randomUUID().toString().replaceAll("-", "");
-    final String localTxId3 = UUID.randomUUID().toString().replaceAll("-", "");
-
-    MessageSerializer messageSerializer = new MessageSerializer();
-    buildData(globalTxId, localTxId1, localTxId2, 
localTxId3).forEach(baseEvent -> {
-      redisMessagePublisher.publish(baseEvent);
-      redisMessageSubscriber.onMessage(new 
DefaultMessage(channelTopic.getTopic().getBytes(), 
messageSerializer.serializer(baseEvent).orElse(new byte[0])), 
channelTopic.getTopic().getBytes());
-    });
-
-    assertEquals(0, actorEventSink.countDownLatch.getCount());
-  }
-
-  private List<BaseEvent> buildData(String globalTxId, String localTxId_1, 
String localTxId_2, String localTxId_3){
-    List<BaseEvent> sagaEvents = new ArrayList<>();
-    
sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
-    
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    
sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
-    return sagaEvents;
-  }
-}
-
-
diff --git 
a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java
 
b/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java
deleted file mode 100644
index f44342e..0000000
--- 
a/alpha/alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm;
-
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-
-import java.util.concurrent.CountDownLatch;
-
-public class RedisEventSink implements ActorEventSink {
-
-  public static final CountDownLatch countDownLatch = new CountDownLatch(8);
-
-  @Override
-  public void send(BaseEvent event) throws Exception {
-    countDownLatch.countDown();
-  }
-}
diff --git a/alpha/alpha-fsm-channel-redis/src/test/resources/log4j2.xml 
b/alpha/alpha-fsm-channel-redis/src/test/resources/log4j2.xml
deleted file mode 100644
index 58924c6..0000000
--- a/alpha/alpha-fsm-channel-redis/src/test/resources/log4j2.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<Configuration status="WARN">
-  <Appenders>
-    <Console name="Console" target="SYSTEM_OUT">
-      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
-    </Console>
-  </Appenders>
-  <Loggers>
-    <Root level="info">
-      <AppenderRef ref="Console"/>
-    </Root>
-  </Loggers>
-</Configuration>
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 4d861f6..777ae3f 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -20,53 +20,47 @@ package org.apache.servicecomb.pack.alpha.fsm;
 import static 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
 import static 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Props;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
 import javax.annotation.PostConstruct;
-import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher;
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher;
+import 
org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration;
+import 
org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryChannelAutoConfiguration;
+import 
org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisChannelAutoConfiguration;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import 
org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository;
+import 
org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel;
 import 
org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
 import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
-import 
org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel;
 import 
org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import 
org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
 import 
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Lazy;
 import org.springframework.core.env.ConfigurableEnvironment;
 import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
 
 @Configuration
+@ImportAutoConfiguration({
+    MemoryChannelAutoConfiguration.class,
+    KafkaChannelAutoConfiguration.class,
+    RedisChannelAutoConfiguration.class})
 @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
 public class FsmAutoConfiguration {
 
-  @Value("${alpha.feature.akka.channel.memory.size:-1}")
-  int memoryEventChannelMemorySize;
-
   
@Value("${alpha.feature.akka.transaction.repository.elasticsearch.batchSize:1000}")
   int repositoryElasticsearchBatchSize;
 
   
@Value("${alpha.feature.akka.transaction.repository.elasticsearch.refreshTime:5000}")
   int repositoryElasticsearchRefreshTime;
 
-  
@Value("${alpha.feature.akka.transaction.repository.elasticsearch.memory.size:-1}")
-  int memoryTransactionRepositoryChannelSize;
-
   @PostConstruct
   void init() {
     System.setProperty("es.set.netty.runtime.available.processors", "false");
@@ -77,7 +71,8 @@ public class FsmAutoConfiguration {
       ConfigurableEnvironment environment, MetricsService metricsService,
       TransactionRepositoryChannel repositoryChannel) {
     ActorSystem system = ActorSystem
-        .create("alpha-akka", akkaConfiguration(applicationContext, 
environment));
+        .create("alpha-cluster", akkaConfiguration(applicationContext, 
environment));
+
     SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
     
SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
     SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
@@ -97,40 +92,9 @@ public class FsmAutoConfiguration {
     return new MetricsService();
   }
 
-  @Bean
-  public ActorEventSink actorEventSink(MetricsService metricsService) {
-    return new SagaActorEventSender(metricsService);
-  }
-
-  @Bean
-  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", 
havingValue = "memory")
-  @ConditionalOnMissingBean(ActorEventChannel.class)
-  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink,
-      MetricsService metricsService) {
-    return new MemoryActorEventChannel(actorEventSink, 
memoryEventChannelMemorySize,
-        metricsService);
-  }
-
-  @Bean
-  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", 
havingValue = "activemq")
-  @ConditionalOnMissingBean(ActorEventChannel.class)
-  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink,
-      MetricsService metricsService) {
-    return new ActiveMQActorEventChannel(actorEventSink, metricsService);
-  }
-
-  @Bean
-  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", 
havingValue = "kafka")
-  @ConditionalOnMissingBean(ActorEventChannel.class)
-  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink,
-      MetricsService metricsService, @Lazy KafkaMessagePublisher 
kafkaMessagePublisher){
-    return new KafkaActorEventChannel(actorEventSink, metricsService, 
kafkaMessagePublisher);
-  }
-
-  @Bean
-  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", 
havingValue = "redis")
-  public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, 
MetricsService metricsService, @Lazy RedisMessagePublisher 
redisMessagePublisher){
-    return new RedisActorEventChannel(actorEventSink, metricsService, 
redisMessagePublisher);
+  @Bean(name = "sagaShardRegionActor")
+  public ActorRef sagaShardRegionActor(ActorSystem actorSystem) {
+    return actorSystem.actorOf(Props.create(SagaShardRegionActor.class));
   }
 
   @Bean
@@ -148,12 +112,9 @@ public class FsmAutoConfiguration {
   }
 
   @Bean
-  @ConditionalOnMissingBean(TransactionRepositoryChannel.class)
-  @ConditionalOnProperty(value = 
"alpha.feature.akka.transaction.repository.channel.type", havingValue = 
"memory", matchIfMissing = true)
   TransactionRepositoryChannel 
memoryTransactionRepositoryChannel(TransactionRepository repository,
       MetricsService metricsService) {
-    return new MemoryTransactionRepositoryChannel(repository, 
memoryTransactionRepositoryChannelSize,
-        metricsService);
+    return new DefaultTransactionRepositoryChannel(repository, metricsService);
   }
 
 }
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
index 40f1ee7..61bbde5 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
@@ -28,14 +28,11 @@ public abstract class AbstractActorEventChannel implements 
ActorEventChannel {
   private static final Logger logger = 
LoggerFactory.getLogger(AbstractActorEventChannel.class);
 
   protected final MetricsService metricsService;
-  protected final ActorEventSink actorEventSink;
 
   public abstract void sendTo(BaseEvent event);
 
   public AbstractActorEventChannel(
-      ActorEventSink actorEventSink,
       MetricsService metricsService) {
-    this.actorEventSink = actorEventSink;
     this.metricsService = metricsService;
   }
 
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java
new file mode 100644
index 0000000..6869add
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java
@@ -0,0 +1,20 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+
+public abstract class AbstractEventConsumer {
+
+  protected final MetricsService metricsService;
+  protected final ActorSystem actorSystem;
+  protected final ActorRef sagaShardRegionActor;
+
+  public AbstractEventConsumer(
+      ActorSystem actorSystem,
+      ActorRef sagaShardRegionActor, MetricsService metricsService) {
+    this.metricsService = metricsService;
+    this.actorSystem = actorSystem;
+    this.sagaShardRegionActor = sagaShardRegionActor;
+  }
+}
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
deleted file mode 100644
index 3fdc19b..0000000
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.channel;
-
-import java.lang.invoke.MethodHandles;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Queue
- * */
-
-public class ActiveMQActorEventChannel extends AbstractActorEventChannel {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  public ActiveMQActorEventChannel(
-      ActorEventSink actorEventSink, MetricsService metricsService) {
-    super(actorEventSink, metricsService);
-  }
-
-  @Override
-  public void sendTo(BaseEvent event){
-    throw new UnsupportedOperationException("Doesn't implement yet!");
-  }
-}
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java
similarity index 67%
rename from 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java
index aca2676..6e3cfe7 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java
@@ -15,24 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
-import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KafkaActorEventChannel extends AbstractActorEventChannel {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private KafkaMessagePublisher kafkaMessagePublisher;
 
-  public KafkaActorEventChannel(
-      ActorEventSink actorEventSink, MetricsService metricsService, 
KafkaMessagePublisher kafkaMessagePublisher) {
-    super(actorEventSink, metricsService);
+  public KafkaActorEventChannel(MetricsService metricsService, 
KafkaMessagePublisher kafkaMessagePublisher) {
+    super(metricsService);
     this.kafkaMessagePublisher = kafkaMessagePublisher;
   }
 
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
new file mode 100644
index 0000000..ad44641
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import com.google.common.collect.Maps;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.PostConstruct;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import 
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+@Configuration
+@ConditionalOnClass(KafkaProperties.class)
+@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue 
= "kafka")
+public class KafkaChannelAutoConfiguration {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  
@Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
+  private String topic;
+
+  @Value("${spring.kafka.bootstrap-servers}")
+  private String bootstrap_servers;
+
+  @Value("${spring.kafka.consumer.group-id:servicecomb-pack}")
+  private String groupId;
+
+  
@Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal")
+  private String trusted_packages;
+
+  @Value("${spring.kafka.producer.batch-size:16384}")
+  private int batchSize;
+
+  @Value("${spring.kafka.producer.retries:0}")
+  private int retries;
+
+  @Value("${spring.kafka.producer.buffer.memory:33554432}")
+  private long bufferMemory;
+
+  @Value("${spring.kafka.consumer.auto.offset.reset:earliest}")
+  private String autoOffsetReset;
+
+  @Value("${spring.kafka.consumer.enable.auto.commit:false}")
+  private boolean enableAutoCommit;
+
+  @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")
+  private int autoCommitIntervalMs;
+
+  @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}")
+  private String ackMode;
+
+  @Value("${spring.kafka.listener.pollTimeout:1500}")
+  private long poolTimeout;
+
+  @Value("${kafka.numPartitions:6}")
+  private int numPartitions;
+
+  @Value("${kafka.replicationFactor:1}")
+  private short replicationFactor;
+
+  @PostConstruct
+  public void init() {
+    Map props = new HashMap<>();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
+    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
+    try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
+      try {
+        final NewTopic newTopic = new NewTopic(topic, numPartitions, 
replicationFactor);
+        final CreateTopicsResult createTopicsResult = adminClient
+            .createTopics(Collections.singleton(newTopic));
+        createTopicsResult.values().get(topic).get();
+      } catch (InterruptedException | ExecutionException e) {
+        if (!(e.getCause() instanceof TopicExistsException)) {
+          throw new RuntimeException(e.getMessage(), e);
+        }
+      }
+    }
+    LOG.info("Kafka Channel Init");
+  }
+
+  @Bean
+  @ConditionalOnMissingBean
+  public KafkaMessagePublisher kafkaMessagePublisher() {
+    Map<String, Object> map = Maps.newHashMap();
+    map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
+    map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+    map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
JsonSerializer.class);
+    map.put(ProducerConfig.RETRIES_CONFIG, retries);
+    map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+    map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+    return new KafkaMessagePublisher(topic,
+        new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(map)));
+  }
+
+  @Bean
+  @ConditionalOnMissingBean(ActorEventChannel.class)
+  public ActorEventChannel kafkaEventChannel(MetricsService metricsService,
+      @Lazy KafkaMessagePublisher kafkaMessagePublisher) {
+    return new KafkaActorEventChannel(metricsService, kafkaMessagePublisher);
+  }
+
+  @Bean
+  KafkaSagaEventConsumer sagaEventKafkaConsumer(ActorSystem actorSystem,
+      @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
+      MetricsService metricsService) {
+    return new KafkaSagaEventConsumer(actorSystem, sagaShardRegionActor, 
metricsService,
+        bootstrap_servers, topic);
+  }
+}
\ No newline at end of file
diff --git 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
similarity index 82%
rename from 
alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index ba96b56..95de39b 100644
--- 
a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -17,15 +17,14 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
+import java.util.concurrent.ExecutionException;
 import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
 
-import java.util.concurrent.ExecutionException;
-
-public class KafkaMessagePublisher implements MessagePublisher {
+public class KafkaMessagePublisher implements MessagePublisher<BaseEvent> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(KafkaMessagePublisher.class);
 
@@ -38,18 +37,13 @@ public class KafkaMessagePublisher implements 
MessagePublisher {
     }
 
     @Override
-    public void publish(Object data) {
+    public void publish(BaseEvent data) {
         if(logger.isDebugEnabled()){
             logger.debug("send message [{}] to [{}]", data, topic);
         }
 
         try {
-            if(data instanceof BaseEvent) {
-                BaseEvent event = (BaseEvent) data;
-                kafkaTemplate.send(topic, event.getGlobalTxId(), event).get();
-            }else{
-                throw new UnsupportedOperationException("data must be 
BaseEvent type");
-            }
+            kafkaTemplate.send(topic, data.getGlobalTxId(), data).get();
         } catch (InterruptedException | ExecutionException | 
UnsupportedOperationException e) {
             logger.error("publish Exception = [{}]", e.getMessage(), e);
             throw new RuntimeException(e);
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
new file mode 100644
index 0000000..b816302
--- /dev/null
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.kafka.CommitterSettings;
+import akka.kafka.ConsumerSettings;
+import akka.kafka.Subscriptions;
+import akka.kafka.javadsl.Committer;
+import akka.kafka.javadsl.Consumer;
+import akka.stream.ActorMaterializer;
+import akka.stream.Materializer;
+import akka.stream.javadsl.Keep;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaSagaEventConsumer extends AbstractEventConsumer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  final String groupId = "servicecomb-pack";
+  final ObjectMapper jsonMapper = new ObjectMapper();
+
+  public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef 
sagaShardRegionActor,
+      MetricsService metricsService, String bootstrap_servers, String topic) {
+    super(actorSystem, sagaShardRegionActor, metricsService);
+
+    // init consumer
+    final Materializer materializer = ActorMaterializer.create(actorSystem);
+    final Config consumerConfig = 
actorSystem.settings().config().getConfig("akka.kafka.consumer");
+    final ConsumerSettings<String, String> consumerSettings =
+        ConsumerSettings
+            .create(consumerConfig, new StringDeserializer(), new 
StringDeserializer())
+            .withBootstrapServers(bootstrap_servers)
+            .withGroupId(groupId)
+            .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+            .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
"5000")
+            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+            .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"StringDeserializer.class")
+            .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "StringDeserializer.class");
+    CommitterSettings committerSettings = 
CommitterSettings.create(consumerConfig);
+    Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
+        .mapAsync(1, event -> { // must be set to 1 for ordered
+          return sendSagaActor(event.record().key(), event.record().value())
+              .thenApply(done -> event.committableOffset());
+        })
+        .toMat(Committer.sink(committerSettings), Keep.both())
+        .mapMaterializedValue(Consumer::createDrainingControl)
+        .run(materializer);
+  }
+
+  private CompletionStage<String> sendSagaActor(String key, String value) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("key {}, value {}", key, value);
+      }
+      long begin = System.currentTimeMillis();
+      metricsService.metrics().doActorReceived();
+      sagaShardRegionActor.tell(jsonMapper.readValue(value, BaseEvent.class), 
sagaShardRegionActor);
+      long end = System.currentTimeMillis();
+      metricsService.metrics().doActorAccepted();
+      metricsService.metrics().doActorAvgTime(end - begin);
+      return CompletableFuture.completedFuture("");
+    } catch (Exception ex) {
+      metricsService.metrics().doActorRejected();
+      LOG.error("key {}, value {}", key, value);
+      throw new CompletionException(ex);
+    }
+  }
+}
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java
similarity index 70%
rename from 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java
index b56fe38..d5222b5 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
@@ -32,12 +33,14 @@ public class MemoryActorEventChannel extends 
AbstractActorEventChannel {
   private final LinkedBlockingQueue<BaseEvent> eventQueue;
   private int size;
 
-  public MemoryActorEventChannel(ActorEventSink actorEventSink, int size,
-      MetricsService metricsService) {
-    super(actorEventSink, metricsService);
+  public MemoryActorEventChannel(MetricsService metricsService, int size) {
+    super(metricsService);
     this.size = size > 0 ? size : Integer.MAX_VALUE;
     eventQueue = new LinkedBlockingQueue(this.size);
-    new Thread(new EventConsumer(), "MemoryActorEventChannel").start();
+  }
+
+  public LinkedBlockingQueue<BaseEvent> getEventQueue() {
+    return eventQueue;
   }
 
   @Override
@@ -48,24 +51,4 @@ public class MemoryActorEventChannel extends 
AbstractActorEventChannel {
       throw new RuntimeException(e);
     }
   }
-
-  class EventConsumer implements Runnable {
-
-    @Override
-    public void run() {
-      while (true) {
-        try {
-          BaseEvent event = eventQueue.peek();
-          if (event != null) {
-            actorEventSink.send(event);
-            eventQueue.poll();
-          } else {
-            Thread.sleep(10);
-          }
-        } catch (Exception ex) {
-          LOG.error(ex.getMessage(), ex);
-        }
-      }
-    }
-  }
 }
diff --git 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
similarity index 84%
rename from 
alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
index dc2ef16..5665eee 100644
--- 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
@@ -16,20 +16,20 @@
  */
 package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.data.redis.serializer.RedisSerializer;
-import org.springframework.data.redis.serializer.SerializationException;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.invoke.MethodHandles;
 import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.SerializationException;
 
 public class MessageSerializer {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(MessageSerializer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static MessageSerializerImpl serializer = null;
 
@@ -37,15 +37,16 @@ public class MessageSerializer {
     serializer = new MessageSerializerImpl();
   }
 
-  public Optional<byte[]> serializer(Object data){
+  public Optional<byte[]> serializer(Object data) {
     return Optional.ofNullable(serializer.serialize(data));
   }
 
-  public Optional<Object> deserialize(byte[] bytes){
+  public Optional<Object> deserialize(byte[] bytes) {
     return Optional.ofNullable(serializer.deserialize(bytes));
   }
 
-  private class MessageSerializerImpl implements RedisSerializer<Object>{
+  private class MessageSerializerImpl implements RedisSerializer<Object> {
+
     @Override
     public byte[] serialize(Object data) throws SerializationException {
       try {
@@ -58,8 +59,8 @@ public class MessageSerializer {
         outputStream.close();
 
         return bytes;
-      }catch (Exception e){
-        logger.error("serialize Exception = [{}]", e.getMessage(), e);
+      } catch (Exception e) {
+        LOG.error("serialize Exception = [{}]", e.getMessage(), e);
       }
 
       return null;
@@ -76,8 +77,8 @@ public class MessageSerializer {
         objectInputStream.close();
 
         return object;
-      }catch (Exception e){
-        logger.error("deserialize Exception = [{}]", e.getMessage(), e);
+      } catch (Exception e) {
+        LOG.error("deserialize Exception = [{}]", e.getMessage(), e);
       }
 
       return null;
diff --git 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java
similarity index 75%
rename from 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java
index f68d7c0..20abdec 100644
--- 
a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java
@@ -15,35 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
 
 import java.lang.invoke.MethodHandles;
 
-import 
org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Pub/Sub
- * */
+ */
 
 public class RedisActorEventChannel extends AbstractActorEventChannel {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private RedisMessagePublisher redisMessagePublisher;
 
-  public RedisActorEventChannel(
-      ActorEventSink actorEventSink, MetricsService metricsService, 
RedisMessagePublisher redisMessagePublisher) {
-    super(actorEventSink, metricsService);
+  public RedisActorEventChannel(MetricsService metricsService,
+      RedisMessagePublisher redisMessagePublisher) {
+    super(metricsService);
     this.redisMessagePublisher = redisMessagePublisher;
   }
 
   @Override
-  public void sendTo(BaseEvent event){
-    if(LOG.isDebugEnabled()) {
+  public void sendTo(BaseEvent event) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("sendTo message = [{}]", event);
     }
     redisMessagePublisher.publish(event);
diff --git 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
similarity index 69%
rename from 
alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
index e67e8d5..3bc8cb4 100644
--- 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
@@ -16,9 +16,15 @@
  */
 package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
+import javax.annotation.PostConstruct;
 import org.apache.servicecomb.pack.alpha.core.NodeStatus;
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
 import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -42,13 +48,20 @@ import 
org.springframework.data.redis.serializer.StringRedisSerializer;
 @ConditionalOnClass(RedisConnection.class)
 @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue 
= "redis")
 public class RedisChannelAutoConfiguration {
-  private static final Logger logger = 
LoggerFactory.getLogger(RedisChannelAutoConfiguration.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   
@Value("${alpha.feature.akka.channel.redis.topic:servicecomb-pack-actor-event}")
   private String topic;
 
+  @PostConstruct
+  public void init() {
+    LOG.info("Redis Channel Init");
+  }
+
   @Bean
-  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory 
redisConnectionFactory) {
+  public RedisTemplate<String, Object> redisTemplate(
+      RedisConnectionFactory redisConnectionFactory) {
     RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
     redisTemplate.setKeySerializer(new StringRedisSerializer());
     redisTemplate.setHashKeySerializer(new 
GenericToStringSerializer<>(Object.class));
@@ -60,25 +73,29 @@ public class RedisChannelAutoConfiguration {
   }
 
   @Bean
-  RedisMessageSubscriber redisMessageSubscriber(@Lazy 
@Qualifier("actorEventSink") ActorEventSink actorEventSink,
+  RedisSagaEventConsumer redisSagaEventConsumer(ActorSystem actorSystem,
+      @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
+      MetricsService metricsService,
       @Lazy @Qualifier("nodeStatus") NodeStatus nodeStatus) {
-    return new RedisMessageSubscriber(actorEventSink, nodeStatus);
+    return new RedisSagaEventConsumer(actorSystem, sagaShardRegionActor, 
metricsService,
+        nodeStatus);
   }
 
   @Bean
-  public MessageListenerAdapter messageListenerAdapter(@Lazy 
@Qualifier("actorEventSink") ActorEventSink actorEventSink,
-      @Lazy @Qualifier("nodeStatus") NodeStatus nodeStatus) {
-    return new MessageListenerAdapter(redisMessageSubscriber(actorEventSink, 
nodeStatus));
+  public MessageListenerAdapter messageListenerAdapter(
+      RedisSagaEventConsumer redisSagaEventConsumer) {
+    return new MessageListenerAdapter(redisSagaEventConsumer);
   }
 
   @Bean
-  public RedisMessageListenerContainer 
redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
-      @Lazy @Qualifier("actorEvetSink") ActorEventSink actorEventSink,
-      @Lazy @Qualifier("nodeStatus") NodeStatus nodeStatus) {
+  public RedisMessageListenerContainer redisMessageListenerContainer(
+      RedisConnectionFactory redisConnectionFactory,
+      RedisSagaEventConsumer redisSagaEventConsumer) {
     RedisMessageListenerContainer redisMessageListenerContainer = new 
RedisMessageListenerContainer();
 
     redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
-    
redisMessageListenerContainer.addMessageListener(redisMessageSubscriber(actorEventSink,
 nodeStatus), channelTopic());
+    redisMessageListenerContainer
+        .addMessageListener(redisSagaEventConsumer, channelTopic());
 
     return redisMessageListenerContainer;
   }
@@ -90,10 +107,16 @@ public class RedisChannelAutoConfiguration {
 
   @Bean
   ChannelTopic channelTopic() {
-    if (logger.isDebugEnabled()) {
-      logger.debug("build channel topic = [{}]", topic);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("build channel topic = [{}]", topic);
     }
     return new ChannelTopic(topic);
   }
 
+  @Bean
+  public ActorEventChannel redisEventChannel(MetricsService metricsService,
+      @Lazy RedisMessagePublisher redisMessagePublisher) {
+    return new RedisActorEventChannel(metricsService, redisMessagePublisher);
+  }
+
 }
diff --git 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
similarity index 75%
rename from 
alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
index 31370e3..eca2af4 100644
--- 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
@@ -17,28 +17,31 @@
 package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
 
 
+import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.listener.ChannelTopic;
 
-public class RedisMessagePublisher implements MessagePublisher {
+public class RedisMessagePublisher implements MessagePublisher<BaseEvent> {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RedisMessagePublisher.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private RedisTemplate<String, Object> redisTemplate;
   private ChannelTopic channelTopic;
 
-  public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate, 
ChannelTopic channelTopic) {
+  public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate,
+      ChannelTopic channelTopic) {
     this.redisTemplate = redisTemplate;
     this.channelTopic = channelTopic;
   }
 
   @Override
-  public void publish(Object data) {
-    if(logger.isDebugEnabled()) {
-      logger.debug("send message [{}] to [{}]", data, channelTopic.getTopic());
+  public void publish(BaseEvent data) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("send message [{}] to [{}]", data, channelTopic.getTopic());
     }
     redisTemplate.convertAndSend(channelTopic.getTopic(), data);
 
diff --git 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessageSubscriber.java
 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java
similarity index 54%
rename from 
alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessageSubscriber.java
rename to 
alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java
index 0aa171b..f19e768 100644
--- 
a/alpha/alpha-fsm-channel-redis/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessageSubscriber.java
+++ 
b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java
@@ -14,57 +14,55 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.core.NodeStatus;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
+import 
org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
 
-import java.nio.charset.StandardCharsets;
-
-public class RedisMessageSubscriber implements MessageListener {
+public class RedisSagaEventConsumer extends AbstractEventConsumer implements 
MessageListener {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RedisMessageSubscriber.class);
-
-  private ActorEventSink actorEventSink;
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private NodeStatus nodeStatus;
-
   private MessageSerializer messageSerializer = new MessageSerializer();
 
-  public RedisMessageSubscriber(ActorEventSink actorEventSink, NodeStatus 
nodeStatus) {
-    this.actorEventSink = actorEventSink;
+  public RedisSagaEventConsumer(ActorSystem actorSystem, ActorRef 
sagaShardRegionActor,
+      MetricsService metricsService,
+      NodeStatus nodeStatus) {
+    super(actorSystem, sagaShardRegionActor, metricsService);
     this.nodeStatus = nodeStatus;
   }
 
   @Override
   public void onMessage(Message message, byte[] pattern) {
-    if(nodeStatus.isMaster()) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("pattern = [{}]", new String(pattern, 
StandardCharsets.UTF_8));
-      }
-
+    if (nodeStatus.isMaster()) {
       messageSerializer.deserialize(message.getBody()).ifPresent(data -> {
-
         BaseEvent event = (BaseEvent) data;
-
-        if (logger.isDebugEnabled()) {
-          logger.debug("event = [{}]", event);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("event = [{}]", event);
         }
-
         try {
-          actorEventSink.send(event);
+          long begin = System.currentTimeMillis();
+          metricsService.metrics().doActorReceived();
+          sagaShardRegionActor.tell(event, sagaShardRegionActor);
+          long end = System.currentTimeMillis();
+          metricsService.metrics().doActorAccepted();
+          metricsService.metrics().doActorAvgTime(end - begin);
         } catch (Exception e) {
-          logger.error("subscriber Exception = [{}]", e.getMessage(), e);
+          metricsService.metrics().doActorRejected();
+          LOG.error("subscriber Exception = [{}]", e.getMessage(), e);
         }
       });
-    }else{
-      if(logger.isDebugEnabled()){
-        logger.debug("nodeStatus is not master and cancel this time 
subscribe");
-      }
     }
   }
 }
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 34c43da..6fa969d 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -33,8 +33,6 @@
   <modules>
     <module>alpha-core</module>
     <module>alpha-fsm</module>
-    <module>alpha-fsm-channel-redis</module>
-    <module>alpha-fsm-channel-kafka</module>
     <module>alpha-benchmark</module>
     <module>alpha-spring-cloud-starter-eureka</module>
     <module>alpha-spring-cloud-starter-consul</module>

Reply via email to