This is an automated email from the ASF dual-hosted git repository.

tzssangglass pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git


The following commit(s) were added to refs/heads/main by this push:
     new f0ef4b7  refactor: switch from reactor-netty to netty, and support 
fetching var and body (#100)
f0ef4b7 is described below

commit f0ef4b7758188ccae3bcf35a6b95f2459180f3c9
Author: tzssangglass <[email protected]>
AuthorDate: Thu Dec 30 15:55:27 2021 +0800

    refactor: switch from reactor-netty to netty, and support fetching var and 
body (#100)
---
 .github/workflows/ci.yaml                          |  10 +-
 docs/en/latest/development.md                      |   4 +-
 .../the-internal-of-apisix-java-plugin-runner.md   |   2 +-
 pom.xml                                            |   6 +-
 runner-core/pom.xml                                |  23 +-
 .../runner/codec/PluginRunnerConfiguration.java    |  38 ---
 .../A6HandlerConfiguration.java}                   |   5 +-
 .../Constants.java}                                |  13 +-
 .../runner/handler/A6HandlerConfiguration.java     |  85 -------
 .../plugin/runner/handler/A6HttpCallHandler.java   |  60 -----
 .../BinaryProtocolDecoder.java}                    |   6 +-
 .../apisix/plugin/runner/handler/Dispatcher.java   |  26 --
 .../plugin/runner/handler/HTTPReqCallHandler.java  | 202 +++++++++++++++
 .../apisix/plugin/runner/handler/Handler.java      |  26 --
 .../apisix/plugin/runner/handler/IOHandler.java    |  44 ----
 .../runner/handler/IOHandlerConfiguration.java     |  34 ---
 .../PayloadDecoder.java}                           |  42 ++-
 .../PayloadEncoder.java}                           |  23 +-
 ...6ConfigHandler.java => PrepareConfHandler.java} |  24 +-
 .../plugin/runner/server/ApplicationRunner.java    | 119 ++++++---
 .../server/config/TcpServerConfiguration.java      |  44 ----
 .../runner/server/config/TcpServerCustomizer.java  |  27 --
 .../src/main/resources/application.properties      |  21 ++
 ...ersDecoderTest.java => PayloadDecoderTest.java} |  23 +-
 ...ersEncoderTest.java => PayloadEncoderTest.java} |  32 +--
 .../plugin/runner/handler/A6ConfigHandlerTest.java |  76 ++++--
 .../runner/handler/A6HttpCallHandlerTest.java      |  85 +++++--
 .../plugin/runner/handler/ExtraInfoTest.java       | 281 +++++++++++++++++++++
 .../src/main/release-docs/LICENSE                  |   3 +-
 runner-plugin-sdk/pom.xml                          |  12 +-
 .../apisix/plugin/runner/ExtraInfoRequest.java     |  70 +++++
 .../apisix/plugin/runner/ExtraInfoResponse.java    |  34 ++-
 .../apache/apisix/plugin/runner/HttpRequest.java   |  28 +-
 .../apache/apisix/plugin/runner/HttpResponse.java  |  14 -
 .../apisix/plugin/runner/filter/PluginFilter.java  |  26 +-
 .../plugin/runner/filter/PluginFilterChain.java    |   8 +-
 runner-plugin/pom.xml                              |   4 -
 .../plugin/runner/PluginRunnerApplicationTest.java |  75 ------
 sample/pom.xml                                     |   4 -
 .../runner/filter/RewriteRequestDemoFilter.java    |  34 ++-
 .../runner/filter/StopRequestDemoFilter.java       |  16 +-
 .../runner/filter/StopRequestDemoFilterTest.java   |  30 ---
 src/main/checkstyle/checkstyle-suppressions.xml    |   8 +-
 43 files changed, 1026 insertions(+), 721 deletions(-)

diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 8c2c832..f7582a7 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -32,9 +32,10 @@ jobs:
       fail-fast: true
     steps:
       - uses: actions/checkout@v2
-      - uses: actions/setup-java@v1
+      - uses: actions/setup-java@v2
         with:
-          java-version: 8
+          distribution: 'zulu'
+          java-version: '11'
       - name: Check License Header
         uses: apache/skywalking-eyes@main
         env:
@@ -49,8 +50,9 @@ jobs:
       fail-fast: true
     steps:
       - uses: actions/checkout@v2
-      - uses: actions/setup-java@v1
+      - uses: actions/setup-java@v2
         with:
-          java-version: 8
+          distribution: 'zulu'
+          java-version: '11'
       - name: 'Install & Test'
         run: ./mvnw clean install
diff --git a/docs/en/latest/development.md b/docs/en/latest/development.md
index f2eaccb..1eeaa1a 100644
--- a/docs/en/latest/development.md
+++ b/docs/en/latest/development.md
@@ -28,8 +28,8 @@ This document explains how to get started to develop the 
apisix-java-plugin-runn
 Prerequisites
 -------------
 
-* JDK 8
-* APISIX 2.7.0
+* JDK 11
+* APISIX 2.10.x
 * Clone the 
[apisix-java-plugin-runner](https://github.com/apache/apisix-java-plugin-runner)
 project.
 * Refer to [Debug](how-it-works.md#debug)  to build the debug environment.
 
diff --git a/docs/en/latest/the-internal-of-apisix-java-plugin-runner.md 
b/docs/en/latest/the-internal-of-apisix-java-plugin-runner.md
index a67fc3e..b8e776d 100644
--- a/docs/en/latest/the-internal-of-apisix-java-plugin-runner.md
+++ b/docs/en/latest/the-internal-of-apisix-java-plugin-runner.md
@@ -33,7 +33,7 @@ This article explains the internal design of 
apisix-java-plugin-runner.
 
 ## Overview
 
-The apisix-java-plugin-runner designed as a TCP server built using 
[reactor-netty](https://github.com/reactor/reactor-netty),
+The apisix-java-plugin-runner designed as a TCP server built using 
[netty](https://github.com/netty/netty),
 it provides a `PluginFilter` interface for users to implement.
 
 Users only need to focus on their business logic, not on the details of how 
the apisix java plugin runner communicates with APISIX.
diff --git a/pom.xml b/pom.xml
index fb4199d..0e07808 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <compiler.version>1.8</compiler.version>
+        <compiler.version>11</compiler.version>
         <netty.version>4.1.54.Final</netty.version>
         <guava.version>30.1.1-jre</guava.version>
         <spring-boot.version>2.4.5</spring-boot.version>
@@ -106,8 +106,10 @@
     <build>
         <plugins>
            <plugin>
-                <artifactId>maven-compiler-plugin</artifactId>
+               <groupId>org.apache.maven.plugins</groupId>
+               <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
+                    <release>${compiler.version}</release>
                     <source>${compiler.version}</source>
                     <target>${compiler.version}</target>
                     <encoding>${project.build.sourceEncoding}</encoding>
diff --git a/runner-core/pom.xml b/runner-core/pom.xml
index ba44381..a1b03dc 100644
--- a/runner-core/pom.xml
+++ b/runner-core/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -55,7 +56,7 @@
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-log4j2</artifactId>
-             <exclusions>
+            <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
                     <artifactId>jul-to-slf4j</artifactId>
@@ -63,12 +64,8 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-reactor-netty</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-core</artifactId>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
@@ -81,11 +78,6 @@
             <classifier>linux-aarch_64</classifier>
         </dependency>
         <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-tcnative-boringssl-static</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>${guava.version}</version>
@@ -103,11 +95,6 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
             <scope>test</scope>
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
deleted file mode 100644
index 65d2f35..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.codec;
-
-import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersDecoder;
-import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersEncoder;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class PluginRunnerConfiguration {
-
-    @Bean
-    public PluginRunnerDecoder createDecoder() {
-        return new FlatBuffersDecoder();
-    }
-
-    @Bean
-    public PluginRunnerEncoder createEncoder() {
-        return new FlatBuffersEncoder();
-    }
-
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/configuration/A6HandlerConfiguration.java
similarity index 94%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
rename to 
runner-core/src/main/java/org/apache/apisix/plugin/runner/configuration/A6HandlerConfiguration.java
index fbf4873..0ed02b6 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/configuration/A6HandlerConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.service;
+package org.apache.apisix.plugin.runner.configuration;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -27,8 +27,7 @@ import org.springframework.context.annotation.Configuration;
 import java.util.concurrent.TimeUnit;
 
 @Configuration
-public class CacheConfiguration {
-
+public class A6HandlerConfiguration {
     @Bean
     public Cache<Long, A6Conf> 
configurationCache(@Value("${cache.config.expired:3610}") long expired,
                                                   
@Value("${cache.config.capacity:1000}") int capacity) {
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/constants/Constants.java
similarity index 74%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
rename to 
runner-core/src/main/java/org/apache/apisix/plugin/runner/constants/Constants.java
index 4518d0b..58104cf 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/constants/Constants.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner.constants;
 
-import org.apache.apisix.plugin.runner.A6Request;
+public class Constants {
 
-import java.nio.ByteBuffer;
+    public static final byte RPC_ERROR = 0;
 
-@FunctionalInterface
-public interface PluginRunnerDecoder {
+    public static final byte RPC_PREPARE_CONF = 1;
 
-    A6Request decode(ByteBuffer buffer);
+    public static final byte RPC_HTTP_REQ_CALL = 2;
+
+    public static final byte RPC_EXTRA_INFO = 3;
 }
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java
deleted file mode 100644
index 761400b..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.handler;
-
-import com.google.common.cache.Cache;
-import io.github.api7.A6.Err.Code;
-import org.apache.apisix.plugin.runner.A6Conf;
-import org.apache.apisix.plugin.runner.A6ConfigResponse;
-import org.apache.apisix.plugin.runner.A6ErrRequest;
-import org.apache.apisix.plugin.runner.A6ErrResponse;
-import org.apache.apisix.plugin.runner.A6Response;
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.HttpResponse;
-import org.apache.apisix.plugin.runner.filter.PluginFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-
-@Configuration
-public class A6HandlerConfiguration {
-    private final Logger logger = 
LoggerFactory.getLogger(A6HandlerConfiguration.class);
-
-    @Bean
-    public A6ConfigHandler createConfigHandler(Cache<Long, A6Conf> cache, 
ObjectProvider<PluginFilter> beanProvider) {
-        List<PluginFilter> pluginFilterList = 
beanProvider.orderedStream().collect(Collectors.toList());
-        Map<String, PluginFilter> filterMap = new HashMap<>();
-        for (PluginFilter filter : pluginFilterList) {
-            filterMap.put(filter.name(), filter);
-        }
-        return new A6ConfigHandler(cache, filterMap);
-    }
-
-    @Bean
-    public A6HttpCallHandler createHttpHandler(Cache<Long, A6Conf> cache) {
-        return new A6HttpCallHandler(cache);
-    }
-
-    @Bean
-    public Dispatcher createDispatcher(A6ConfigHandler configHandler, 
A6HttpCallHandler httpCallHandler) {
-        return request -> {
-            A6Response response;
-            switch (request.getType()) {
-                case 0:
-                    response = new A6ErrResponse(((A6ErrRequest) 
request).getCode());
-                    return response;
-                case 1:
-                    long confToken = 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
-                    response = new A6ConfigResponse(confToken);
-                    configHandler.handle(request, response);
-                    return response;
-                case 2:
-                    response = new HttpResponse(((HttpRequest) 
request).getRequestId());
-                    httpCallHandler.handle(request, response);
-                    return response;
-                default:
-                    logger.warn("can not dispatch type: {}", 
request.getType());
-                    response = new A6ErrResponse(Code.SERVICE_UNAVAILABLE);
-                    return response;
-            }
-        };
-    }
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java
deleted file mode 100644
index 4f3ead4..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.handler;
-
-import com.google.common.cache.Cache;
-import io.github.api7.A6.Err.Code;
-import lombok.RequiredArgsConstructor;
-import org.apache.apisix.plugin.runner.A6Conf;
-import org.apache.apisix.plugin.runner.A6ErrResponse;
-import org.apache.apisix.plugin.runner.A6Request;
-import org.apache.apisix.plugin.runner.A6Response;
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.HttpResponse;
-import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
-
-@RequiredArgsConstructor
-public class A6HttpCallHandler implements Handler {
-    private final Logger logger = 
LoggerFactory.getLogger(A6HttpCallHandler.class);
-
-    private final Cache<Long, A6Conf> cache;
-
-    @Override
-    public void handle(A6Request request, A6Response response) {
-        HttpRequest req = (HttpRequest) request;
-        HttpResponse rsp = (HttpResponse) response;
-
-        long confToken = ((HttpRequest) request).getConfToken();
-        A6Conf conf = cache.getIfPresent(confToken);
-        if (Objects.isNull(conf)) {
-            logger.warn("cannot find conf token: {}", confToken);
-            A6ErrResponse errResponse = new 
A6ErrResponse(Code.CONF_TOKEN_NOT_FOUND);
-            rsp.setErrResponse(errResponse);
-            return;
-        }
-
-        req.initCtx(rsp, conf.getConfig());
-        PluginFilterChain chain = conf.getChain();
-        chain.filter(req, rsp);
-    }
-
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/DelayedDecoder.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/BinaryProtocolDecoder.java
similarity index 84%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/DelayedDecoder.java
rename to 
runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/BinaryProtocolDecoder.java
index df883fa..7157135 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/DelayedDecoder.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/BinaryProtocolDecoder.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner.handler;
 
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
-public class DelayedDecoder extends LengthFieldBasedFrameDecoder {
+public class BinaryProtocolDecoder extends LengthFieldBasedFrameDecoder {
 
-    public DelayedDecoder() {
+    public BinaryProtocolDecoder() {
         super(16777215, 1, 3, 0, 0);
     }
 }
\ No newline at end of file
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Dispatcher.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Dispatcher.java
deleted file mode 100644
index 6f57eba..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Dispatcher.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.apisix.plugin.runner.handler;
-
-import org.apache.apisix.plugin.runner.A6Request;
-import org.apache.apisix.plugin.runner.A6Response;
-
-public interface Dispatcher {
-    
-    A6Response dispatch(A6Request request);
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
new file mode 100644
index 0000000..4cef266
--- /dev/null
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.common.cache.Cache;
+import io.github.api7.A6.Err.Code;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+import lombok.RequiredArgsConstructor;
+
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ErrResponse;
+import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.ExtraInfoRequest;
+import org.apache.apisix.plugin.runner.ExtraInfoResponse;
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.HttpResponse;
+import org.apache.apisix.plugin.runner.constants.Constants;
+import org.apache.apisix.plugin.runner.filter.PluginFilter;
+import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
+
+@RequiredArgsConstructor
+public class HTTPReqCallHandler extends SimpleChannelInboundHandler<A6Request> 
{
+
+    private final Logger logger = 
LoggerFactory.getLogger(HTTPReqCallHandler.class);
+
+    private final static String EXTRA_INFO_REQ_BODY_KEY = "request_body";
+
+    private final Cache<Long, A6Conf> cache;
+
+    /**
+     * the name of the nginx variable to be queried with queue staging
+     * whether thread-safe collections are required?
+     */
+    private Queue<String> queue = new LinkedList<>();
+
+    private HttpRequest currReq;
+
+    private HttpResponse currResp;
+
+    private long confToken;
+
+    Map<String, String> nginxVars = new HashMap<>();
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
+        try {
+            if (request.getType() == Constants.RPC_EXTRA_INFO) {
+                handleExtraInfo(ctx, (ExtraInfoResponse) request);
+            }
+
+            if (request.getType() == Constants.RPC_HTTP_REQ_CALL) {
+                handleHttpReqCall(ctx, (HttpRequest) request);
+            }
+        } catch (Exception e) {
+            logger.error("handle request error: ", e);
+            errorHandle(ctx, Code.SERVICE_UNAVAILABLE);
+        }
+    }
+
+    private void handleExtraInfo(ChannelHandlerContext ctx, ExtraInfoResponse 
request) {
+        String result = request.getResult();
+        String varsKey = queue.poll();
+        if (Objects.isNull(varsKey)) {
+            logger.error("queue is empty");
+            errorHandle(ctx, Code.SERVICE_UNAVAILABLE);
+            return;
+        }
+
+        if (EXTRA_INFO_REQ_BODY_KEY.equals(varsKey)) {
+            currReq.setBody(result);
+        } else {
+            nginxVars.put(varsKey, result);
+        }
+        if (queue.isEmpty()) {
+            doFilter(ctx);
+        }
+    }
+
+    private void doFilter(ChannelHandlerContext ctx) {
+        A6Conf conf = cache.getIfPresent(confToken);
+        if (Objects.isNull(conf)) {
+            logger.warn("cannot find conf token: {}", confToken);
+            errorHandle(ctx, Code.CONF_TOKEN_NOT_FOUND);
+            return;
+        }
+
+        currReq.initCtx(currResp, conf.getConfig());
+        currReq.setVars(nginxVars);
+
+        PluginFilterChain chain = conf.getChain();
+        chain.filter(currReq, currResp);
+
+        ctx.writeAndFlush(currResp);
+    }
+
+    private void handleHttpReqCall(ChannelHandlerContext ctx, HttpRequest 
request) {
+        cleanCtx();
+
+        // save HttpCallRequest
+        currReq = request;
+        currResp = new HttpResponse(currReq.getRequestId());
+
+        confToken = currReq.getConfToken();
+        A6Conf conf = cache.getIfPresent(confToken);
+        if (Objects.isNull(conf)) {
+            logger.warn("cannot find conf token: {}", confToken);
+            errorHandle(ctx, Code.CONF_TOKEN_NOT_FOUND);
+            return;
+        }
+
+        PluginFilterChain chain = conf.getChain();
+
+        // if the filter chain is empty, then return the response directly
+        if (Objects.isNull(chain) || 0 == chain.getFilters().size()) {
+            ctx.writeAndFlush(currResp);
+            return;
+        }
+
+        // fetch the nginx variables
+        Set<String> varKeys = new HashSet<>();
+        boolean requiredBody = false;
+        boolean requiredVars = false;
+
+        for (PluginFilter filter : chain.getFilters()) {
+            Collection<String> vars = filter.requiredVars();
+            if (!CollectionUtils.isEmpty(vars)) {
+                varKeys.addAll(vars);
+                requiredVars = true;
+            }
+
+            if (filter.requiredBody() != null && filter.requiredBody()) {
+                requiredBody = true;
+            }
+        }
+
+        if (varKeys.size() > 0) {
+            for (String varKey : varKeys) {
+                boolean offer = queue.offer(varKey);
+                if (!offer) {
+                    logger.error("queue is full");
+                    errorHandle(ctx, Code.SERVICE_UNAVAILABLE);
+                    return;
+                }
+                ExtraInfoRequest extraInfoRequest = new 
ExtraInfoRequest(varKey, null);
+                ctx.writeAndFlush(extraInfoRequest);
+            }
+        }
+
+        // fetch the request body
+        if (requiredBody) {
+            queue.offer(EXTRA_INFO_REQ_BODY_KEY);
+            ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(null, 
true);
+            ctx.writeAndFlush(extraInfoRequest);
+        }
+
+        // no need to fetch the nginx variables or request body, just do filter
+        if (!requiredBody && !requiredVars) {
+            doFilter(ctx);
+        }
+    }
+
+    private void errorHandle(ChannelHandlerContext ctx, int code) {
+        A6ErrResponse errResponse = new A6ErrResponse(code);
+        ctx.writeAndFlush(errResponse);
+    }
+
+    private void cleanCtx() {
+        queue.clear();
+        nginxVars.clear();
+        currReq = null;
+        currResp = null;
+        confToken = -1;
+    }
+}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Handler.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Handler.java
deleted file mode 100644
index eb20393..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Handler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.handler;
-
-import org.apache.apisix.plugin.runner.A6Request;
-import org.apache.apisix.plugin.runner.A6Response;
-
-public interface Handler {
-    
-    void handle(A6Request request, A6Response response);
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
deleted file mode 100644
index 9cd3f19..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.handler;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
-import reactor.netty.NettyInbound;
-import reactor.netty.NettyOutbound;
-
-@RequiredArgsConstructor
-public class IOHandler {
-
-    private final PluginRunnerDecoder decoder;
-
-    private final Dispatcher dispatcher;
-
-    private final PluginRunnerEncoder encoder;
-
-    public Publisher<Void> handle(NettyInbound in, NettyOutbound out) {
-        return in.receive().asByteBuffer()
-                .map(decoder::decode)
-                .map(dispatcher::dispatch)
-                .flatMap(e -> 
out.sendByteArray(Mono.just(encoder.encode(e).array())).then());
-    }
-
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
deleted file mode 100644
index be538ec..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.handler;
-
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class IOHandlerConfiguration {
-
-    @Bean
-    public IOHandler createIOHandler(PluginRunnerDecoder decoder,
-                                     Dispatcher dispatcher,
-                                     PluginRunnerEncoder encoder) {
-        return new IOHandler(decoder, dispatcher, encoder);
-    }
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadDecoder.java
similarity index 72%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
rename to 
runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadDecoder.java
index 0e1d221..eb49028 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadDecoder.java
@@ -15,25 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec.impl;
+package org.apache.apisix.plugin.runner.handler;
 
 import io.github.api7.A6.Err.Code;
-import org.apache.apisix.plugin.runner.A6ConfigRequest;
-import org.apache.apisix.plugin.runner.A6ErrRequest;
-import org.apache.apisix.plugin.runner.A6Request;
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 
-public class FlatBuffersDecoder implements PluginRunnerDecoder {
+import org.apache.apisix.plugin.runner.A6ConfigRequest;
+import org.apache.apisix.plugin.runner.A6ErrRequest;
+import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.ExtraInfoResponse;
+import org.apache.apisix.plugin.runner.constants.Constants;
 
-    private final Logger logger = 
LoggerFactory.getLogger(FlatBuffersDecoder.class);
+public class PayloadDecoder extends SimpleChannelInboundHandler<ByteBuf> {
+    private final Logger logger = 
LoggerFactory.getLogger(PayloadDecoder.class);
 
     @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
+        ByteBuffer buffer = byteBuf.nioBuffer();
+        A6Request request = decode(buffer);
+        ctx.fireChannelRead(request);
+    }
+
     public A6Request decode(ByteBuffer buffer) {
         byte type;
         try {
@@ -45,7 +55,7 @@ public class FlatBuffersDecoder implements 
PluginRunnerDecoder {
 
         ByteBuffer body;
         switch (type) {
-            case 1:
+            case Constants.RPC_PREPARE_CONF:
                 A6ConfigRequest a6ConfigRequest;
                 try {
                     body = getBody(buffer);
@@ -55,7 +65,7 @@ public class FlatBuffersDecoder implements 
PluginRunnerDecoder {
                     return new A6ErrRequest(Code.BAD_REQUEST);
                 }
                 return a6ConfigRequest;
-            case 2:
+            case Constants.RPC_HTTP_REQ_CALL:
                 HttpRequest httpRequest;
                 try {
                     body = getBody(buffer);
@@ -64,6 +74,16 @@ public class FlatBuffersDecoder implements 
PluginRunnerDecoder {
                     return new A6ErrRequest(Code.BAD_REQUEST);
                 }
                 return httpRequest;
+
+            case Constants.RPC_EXTRA_INFO:
+                ExtraInfoResponse extraInfoResponse;
+                try {
+                    body = getBody(buffer);
+                    extraInfoResponse = ExtraInfoResponse.from(body);
+                } catch (BufferUnderflowException | IndexOutOfBoundsException 
e) {
+                    return new A6ErrRequest(Code.BAD_REQUEST);
+                }
+                return extraInfoResponse;
             default:
                 break;
         }
@@ -89,7 +109,7 @@ public class FlatBuffersDecoder implements 
PluginRunnerDecoder {
         return buffer;
     }
 
-    int bytes2Int(byte[] b, int start, int len) {
+    private int bytes2Int(byte[] b, int start, int len) {
         int sum = 0;
         int end = start + len;
         for (int i = start; i < end; i++) {
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadEncoder.java
similarity index 71%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
rename to 
runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadEncoder.java
index caead07..6c1eeca 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadEncoder.java
@@ -15,21 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec.impl;
+package org.apache.apisix.plugin.runner.handler;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
 import org.apache.apisix.plugin.runner.A6Response;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
 
 import java.nio.ByteBuffer;
 
-public class FlatBuffersEncoder implements PluginRunnerEncoder {
+public class PayloadEncoder extends ChannelOutboundHandlerAdapter {
+
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
+        if (msg instanceof A6Response) {
+            A6Response response = (A6Response) msg;
+            ByteBuffer buffer = encode(response);
+            ByteBuf buf = Unpooled.wrappedBuffer(buffer);
+            ctx.write(buf, promise);
+        }
+    }
 
-    @Override
     public ByteBuffer encode(A6Response response) {
         ByteBuffer buffer = response.encode();
-        if (null != response.getErrResponse()) {
-            return setBody(buffer, (byte) 0);
-        }
         return setBody(buffer, response.getType());
     }
 
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
similarity index 80%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java
rename to 
runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
index 94c07cd..799208a 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -20,12 +20,15 @@ package org.apache.apisix.plugin.runner.handler;
 import com.google.common.cache.Cache;
 import io.github.api7.A6.PrepareConf.Req;
 import io.github.api7.A6.TextEntry;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import lombok.RequiredArgsConstructor;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
 import org.apache.apisix.plugin.runner.A6Request;
 import org.apache.apisix.plugin.runner.A6Response;
+import org.apache.apisix.plugin.runner.constants.Constants;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
 import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
 import org.slf4j.Logger;
@@ -36,20 +39,26 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
 
-/**
- * Handle APISIX configuration request.
- */
 @RequiredArgsConstructor
-public class A6ConfigHandler implements Handler {
-    private final Logger logger = 
LoggerFactory.getLogger(A6ConfigHandler.class);
+public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> 
{
+
+    private final Logger logger = 
LoggerFactory.getLogger(PrepareConfHandler.class);
 
     private final Cache<Long, A6Conf> cache;
     private final Map<String, PluginFilter> filters;
 
     @Override
-    public void handle(A6Request request, A6Response response) {
+    protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
+        if (request.getType() != Constants.RPC_PREPARE_CONF) {
+            ctx.fireChannelRead(request);
+            return;
+        }
+
         Req req = ((A6ConfigRequest) request).getReq();
+        long confToken = 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+        A6Response response = new A6ConfigResponse(confToken);
         long token = ((A6ConfigResponse) response).getConfToken();
         PluginFilterChain chain = createFilterChain(req);
 
@@ -66,6 +75,8 @@ public class A6ConfigHandler implements Handler {
         }
         A6Conf a6Conf = new A6Conf(config, chain);
         cache.put(token, a6Conf);
+        ctx.write(response);
+        ctx.writeAndFlush(response);
     }
 
     private PluginFilterChain createFilterChain(Req req) {
@@ -85,5 +96,4 @@ public class A6ConfigHandler implements Handler {
         }
         return new PluginFilterChain(chainFilters);
     }
-
 }
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
index 85acabc..2d3fc20 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
+++ 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
@@ -17,21 +17,39 @@
 
 package org.apache.apisix.plugin.runner.server;
 
+import com.google.common.cache.Cache;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerDomainSocketChannel;
 import io.netty.channel.unix.DomainSocketAddress;
+import io.netty.channel.unix.DomainSocketChannel;
 import io.netty.handler.logging.LoggingHandler;
 import lombok.RequiredArgsConstructor;
-import org.apache.apisix.plugin.runner.codec.DelayedDecoder;
-import org.apache.apisix.plugin.runner.handler.IOHandler;
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.filter.PluginFilter;
+import org.apache.apisix.plugin.runner.handler.PrepareConfHandler;
+import org.apache.apisix.plugin.runner.handler.HTTPReqCallHandler;
+import org.apache.apisix.plugin.runner.handler.PayloadDecoder;
+import org.apache.apisix.plugin.runner.handler.BinaryProtocolDecoder;
+import org.apache.apisix.plugin.runner.handler.PayloadEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
-import reactor.netty.DisposableServer;
-import reactor.netty.tcp.TcpServer;
 
-import java.time.Duration;
-import java.util.Objects;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 @Component
 @RequiredArgsConstructor
@@ -39,50 +57,71 @@ public class ApplicationRunner implements CommandLineRunner 
{
 
     private final Logger logger = 
LoggerFactory.getLogger(ApplicationRunner.class);
 
-    private final TcpServer tcpServer;
-
     @Value("${socket.file}")
     private String socketFile;
 
-    private final IOHandler handler;
+    private Cache<Long, A6Conf> cache;
 
-    private DisposableServer server;
+    private ObjectProvider<PluginFilter> beanProvider;
 
-    @Override
-    public void run(String... args) throws Exception {
-        TcpServer tcpServer = this.tcpServer;
+    @Autowired
+    public ApplicationRunner(Cache<Long, A6Conf> cache, 
ObjectProvider<PluginFilter> beanProvider) {
+        this.cache = cache;
+        this.beanProvider = beanProvider;
+    }
 
-        if (Objects.isNull(tcpServer)) {
-            tcpServer = TcpServer.create();
+    public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> 
cache, ObjectProvider<PluginFilter> beanProvider) {
+        List<PluginFilter> pluginFilterList = 
beanProvider.orderedStream().collect(Collectors.toList());
+        Map<String, PluginFilter> filterMap = new HashMap<>();
+        for (PluginFilter filter : pluginFilterList) {
+            filterMap.put(filter.name(), filter);
         }
+        return new PrepareConfHandler(cache, filterMap);
+    }
 
-        if (socketFile.startsWith("unix:")) {
-            socketFile = socketFile.substring("unix:".length());
+    public HTTPReqCallHandler createA6HttpHandler(Cache<Long, A6Conf> cache) {
+        return new HTTPReqCallHandler(cache);
+    }
+
+    public void start(String path) throws Exception {
+        EventLoopGroup group = new EpollEventLoopGroup();
+        try {
+            ServerBootstrap bootstrap = new ServerBootstrap();
+            initServerBootstrap(group, bootstrap);
+            ChannelFuture future = bootstrap.bind(new 
DomainSocketAddress(path)).sync();
+            Runtime.getRuntime().exec("chmod 777 " + socketFile);
+            logger.warn("java runner is listening on the socket file: {}", 
socketFile);
+
+            future.channel().closeFuture().sync();
+        } finally {
+            group.shutdownGracefully().sync();
         }
+    }
+
+    private ServerBootstrap initServerBootstrap(EventLoopGroup group, 
ServerBootstrap bootstrap) {
+        return bootstrap.group(group)
+                .channel(EpollServerDomainSocketChannel.class)
+                .childHandler(new ChannelInitializer<DomainSocketChannel>() {
+                    @Override
+                    protected void initChannel(DomainSocketChannel channel) {
+                        channel.pipeline().addFirst("logger", new 
LoggingHandler())
+                                .addAfter("logger", "payloadEncoder", new 
PayloadEncoder())
+                                .addAfter("payloadEncoder", "delayedDecoder", 
new BinaryProtocolDecoder())
+                                .addLast("payloadDecoder", new 
PayloadDecoder())
+                                .addAfter("payloadDecoder", 
"prepareConfHandler", createConfigReqHandler(cache, beanProvider))
+                                .addAfter("prepareConfHandler", 
"hTTPReqCallHandler", createA6HttpHandler(cache));
+
+                    }
+                });
+    }
 
-        tcpServer = tcpServer.bindAddress(() -> new 
DomainSocketAddress(socketFile));
-        tcpServer = tcpServer.doOnChannelInit((observer, channel, addr) -> {
-//            if (Objects.nonNull(channelHandlers)) {
-//                channel.pipeline().addLast(channelHandlers);
-//            }
-            channel.pipeline().addFirst("logger", new LoggingHandler())
-                    .addAfter("logger", "delayedDecoder", new 
DelayedDecoder());
-        });
-
-        if (Objects.nonNull(handler)) {
-            tcpServer = tcpServer.handle(handler::handle);
+    @Override
+    public void run(String... args) throws Exception {
+        if (socketFile.startsWith("unix:")) {
+            socketFile = socketFile.substring("unix:".length());
         }
-        this.server = tcpServer.bindNow();
-        logger.warn("java runner is listening on the socket file: {}", 
socketFile);
-
-        // delete socket file when tcp server shutdown
-        Runtime.getRuntime()
-                .addShutdownHook(new Thread(() -> 
this.server.disposeNow(Duration.ofSeconds(45))));
-
-        Thread awaitThread = new Thread(() -> this.server.onDispose().block());
-        awaitThread.setDaemon(false);
-        awaitThread.setName("uds-server");
-        awaitThread.start();
-        Runtime.getRuntime().exec("chmod 777 " + socketFile);
+        Path socketPath = Paths.get(socketFile);
+        Files.deleteIfExists(socketPath);
+        start(socketPath.toString());
     }
 }
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/TcpServerConfiguration.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/TcpServerConfiguration.java
deleted file mode 100644
index a6a4537..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/TcpServerConfiguration.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.server.config;
-
-import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import reactor.netty.tcp.TcpServer;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Configuration
-public class TcpServerConfiguration {
-    
-    @Bean
-    public TcpServer tcpServer(ObjectProvider<TcpServerCustomizer> 
customizers) {
-        return 
create(customizers.orderedStream().collect(Collectors.toList()));
-    }
-    
-    private TcpServer create(List<TcpServerCustomizer> customizers) {
-        TcpServer server = TcpServer.create();
-        for (TcpServerCustomizer customizer : customizers) {
-            server = customizer.customize(server);
-        }
-        return server;
-    }
-    
-}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/TcpServerCustomizer.java
 
b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/TcpServerCustomizer.java
deleted file mode 100644
index a940959..0000000
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/TcpServerCustomizer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.server.config;
-
-import reactor.netty.tcp.TcpServer;
-
-@FunctionalInterface
-public interface TcpServerCustomizer {
-    
-    TcpServer customize(TcpServer tcpServer);
-    
-}
diff --git a/runner-core/src/main/resources/application.properties 
b/runner-core/src/main/resources/application.properties
new file mode 100644
index 0000000..3d3a7dc
--- /dev/null
+++ b/runner-core/src/main/resources/application.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+#spring.rsocket.server.port=7000
+
+#logging.level.root=DEBUG
diff --git 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoderTest.java
 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/PayloadDecoderTest.java
similarity index 89%
rename from 
runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoderTest.java
rename to 
runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/PayloadDecoderTest.java
index 77524d0..aeb2ab0 100644
--- 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoderTest.java
+++ 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/PayloadDecoderTest.java
@@ -25,22 +25,24 @@ import io.netty.buffer.Unpooled;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ErrRequest;
 import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.handler.PayloadDecoder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.mockito.InjectMocks;
 import org.mockito.MockitoAnnotations;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import java.nio.ByteBuffer;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 @DisplayName("test decode data")
-class FlatBuffersDecoderTest {
+class PayloadDecoderTest {
 
     @InjectMocks
-    FlatBuffersDecoder flatBuffersDecoder;
+    PayloadDecoder payloadDecoder;
 
     @BeforeEach
     void setUp() {
@@ -52,7 +54,7 @@ class FlatBuffersDecoderTest {
     void testEmptyData() {
         byte[] bytes = new byte[]{};
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6Request result = flatBuffersDecoder.decode(buffer);
+        A6Request result = payloadDecoder.decode(buffer);
         Assertions.assertEquals(Code.BAD_REQUEST, ((A6ErrRequest) 
result).getCode());
     }
 
@@ -61,7 +63,7 @@ class FlatBuffersDecoderTest {
     void testUnsupportedType() {
         byte[] bytes = new byte[]{4};
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6Request result = flatBuffersDecoder.decode(buffer);
+        A6Request result = payloadDecoder.decode(buffer);
         Assertions.assertEquals(Code.BAD_REQUEST, ((A6ErrRequest) 
result).getCode());
     }
 
@@ -71,7 +73,7 @@ class FlatBuffersDecoderTest {
         // data length is greater than actual length
         byte[] bytes = new byte[]{1, 0, 0, 3, 0};
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6Request result = flatBuffersDecoder.decode(buffer);
+        A6Request result = payloadDecoder.decode(buffer);
         Assertions.assertEquals(Code.BAD_REQUEST, ((A6ErrRequest) 
result).getCode());
     }
 
@@ -81,7 +83,7 @@ class FlatBuffersDecoderTest {
         // data length equal to 0
         byte[] bytes = new byte[]{1, 0, 0, 0, 0};
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6Request result = flatBuffersDecoder.decode(buffer);
+        A6Request result = payloadDecoder.decode(buffer);
         Assertions.assertEquals(Code.BAD_REQUEST, ((A6ErrRequest) 
result).getCode());
     }
 
@@ -91,7 +93,7 @@ class FlatBuffersDecoderTest {
         // wrong data content
         byte[] bytes = new byte[]{1, 0, 0, 1, 0, 1};
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6Request result = flatBuffersDecoder.decode(buffer);
+        A6Request result = payloadDecoder.decode(buffer);
         Assertions.assertEquals(Code.BAD_REQUEST, ((A6ErrRequest) 
result).getCode());
     }
 
@@ -117,7 +119,7 @@ class FlatBuffersDecoderTest {
         System.arraycopy(data, 0, bytes, header.length, data.length);
 
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6ConfigRequest configReq = (A6ConfigRequest) 
flatBuffersDecoder.decode(buffer);
+        A6ConfigRequest configReq = (A6ConfigRequest) 
payloadDecoder.decode(buffer);
         for (int i = 0; i < configReq.getReq().confLength(); i++) {
             TextEntry conf = configReq.getReq().conf(i);
             Assertions.assertEquals("foo", conf.name());
@@ -148,7 +150,7 @@ class FlatBuffersDecoderTest {
         System.arraycopy(data, 0, bytes, header.length, data.length);
 
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
-        A6ConfigRequest configReq = (A6ConfigRequest) 
flatBuffersDecoder.decode(buffer);
+        A6ConfigRequest configReq = (A6ConfigRequest) 
payloadDecoder.decode(buffer);
         assertThrows(IndexOutOfBoundsException.class, () -> 
configReq.getReq().conf(0));
     }
 
@@ -156,8 +158,7 @@ class FlatBuffersDecoderTest {
     @DisplayName("test decode data length greater then 256")
     void testDecodeDataGreaterLargeThen256() {
         byte[] bytes = new byte[]{0, 1, 4};
-        int length = flatBuffersDecoder.bytes2Int(bytes, 0, 3);
-
+        int length = ReflectionTestUtils.invokeMethod(payloadDecoder, 
"bytes2Int", bytes, 0, 3);
         // use Bytebuf getInt function (default 4 bytes) to verify
         ByteBuf buf = Unpooled.buffer(4);
         byte[] bufBytes = {0, 0, 1, 4};
diff --git 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoderTest.java
 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/PayloadEncoderTest.java
similarity index 91%
rename from 
runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoderTest.java
rename to 
runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/PayloadEncoderTest.java
index 7cb9c11..bde4d21 100644
--- 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoderTest.java
+++ 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/codec/impl/PayloadEncoderTest.java
@@ -26,21 +26,23 @@ import io.netty.buffer.Unpooled;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
 import org.apache.apisix.plugin.runner.A6ErrResponse;
 import org.apache.apisix.plugin.runner.HttpResponse;
+import org.apache.apisix.plugin.runner.handler.PayloadEncoder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import java.nio.ByteBuffer;
 
 @DisplayName("test encode data")
-class FlatBuffersEncoderTest {
-    FlatBuffersEncoder flatBuffersEncoder = new FlatBuffersEncoder();
+class PayloadEncoderTest {
+    PayloadEncoder payloadEncoder = new PayloadEncoder();
 
     @Test
     @DisplayName("test encode error response(1)")
     void testErrResponseEncode1() {
         A6ErrResponse errResponse = new A6ErrResponse(Code.BAD_REQUEST);
-        ByteBuffer result = flatBuffersEncoder.encode(errResponse);
+        ByteBuffer result = payloadEncoder.encode(errResponse);
         byte[] res = new byte[result.remaining()];
         result.get(res);
 
@@ -53,7 +55,7 @@ class FlatBuffersEncoderTest {
     @DisplayName("test encode error response(2)")
     void testErrResponseEncode2() {
         A6ErrResponse errResponse = new A6ErrResponse(Code.BAD_REQUEST);
-        ByteBuffer result = flatBuffersEncoder.encode(errResponse);
+        ByteBuffer result = payloadEncoder.encode(errResponse);
         result.position(4);
         io.github.api7.A6.Err.Resp resp = 
io.github.api7.A6.Err.Resp.getRootAsResp(result);
         Assertions.assertEquals(Code.BAD_REQUEST, resp.code());
@@ -63,7 +65,7 @@ class FlatBuffersEncoderTest {
     @DisplayName("test prepare conf response(1)")
     void testPrepareConfResponseEncode1() {
         A6ConfigResponse configResponse = new A6ConfigResponse(0L);
-        ByteBuffer result = flatBuffersEncoder.encode(configResponse);
+        ByteBuffer result = payloadEncoder.encode(configResponse);
         byte[] res = new byte[result.remaining()];
         result.get(res);
 
@@ -76,7 +78,7 @@ class FlatBuffersEncoderTest {
     @DisplayName("test prepare conf response(2)")
     void testPrepareConfResponseEncode2() {
         A6ConfigResponse configResponse = new A6ConfigResponse(0L);
-        ByteBuffer result = flatBuffersEncoder.encode(configResponse);
+        ByteBuffer result = payloadEncoder.encode(configResponse);
         result.position(4);
         io.github.api7.A6.PrepareConf.Resp resp = 
io.github.api7.A6.PrepareConf.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.confToken(), 0L);
@@ -86,7 +88,7 @@ class FlatBuffersEncoderTest {
     @DisplayName("test http call response")
     void testHTTPCallResponseEncode1() {
         HttpResponse errResponse = new HttpResponse(0L);
-        ByteBuffer result = flatBuffersEncoder.encode(errResponse);
+        ByteBuffer result = payloadEncoder.encode(errResponse);
         byte[] res = new byte[result.remaining()];
         result.get(res);
 
@@ -99,7 +101,7 @@ class FlatBuffersEncoderTest {
     @DisplayName("test http call response, action: none")
     void testHTTPCallResponseEncode2() {
         HttpResponse httpResponse = new HttpResponse(0L);
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.id(), 0L);
@@ -115,7 +117,7 @@ class FlatBuffersEncoderTest {
         httpResponse.setPath("/hello");
         httpResponse.setArg("foo", "bar");
         httpResponse.setReqHeader("Server", "APISIX");
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.actionType(), Action.Rewrite);
@@ -136,7 +138,7 @@ class FlatBuffersEncoderTest {
         httpResponse.setStatusCode(401);
         httpResponse.setHeader("code", "401");
         httpResponse.setBody("Unauthorized");
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.actionType(), Action.Stop);
@@ -165,7 +167,7 @@ class FlatBuffersEncoderTest {
         httpResponse.setStatusCode(401);
         httpResponse.setHeader("code", "401");
         httpResponse.setBody("Unauthorized");
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.actionType(), Action.Stop);
@@ -184,7 +186,7 @@ class FlatBuffersEncoderTest {
         httpResponse.setReqHeader("Authorization", "Bearer 
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c");
         httpResponse.setReqHeader("Timestamp", "1623408133");
 
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.actionType(), Action.Rewrite);
@@ -230,7 +232,7 @@ class FlatBuffersEncoderTest {
         httpResponse.setHeader("Server", "APISIX");
         httpResponse.setHeader("Error", "Unauthorized");
         httpResponse.setHeader("Timestamp", "1623408133");
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.actionType(), Action.Stop);
@@ -253,7 +255,7 @@ class FlatBuffersEncoderTest {
     @Test
     @DisplayName("test encode data length greater then 256")
     void testEncodeDataGreaterLargeThen256() {
-        byte[] bytes = flatBuffersEncoder.int2Bytes(260, 3);
+        byte[] bytes = ReflectionTestUtils.invokeMethod(payloadEncoder, 
"int2Bytes", 260, 3);
 
         // use Bytebuf getInt function (default 4 bytes) to verify
         ByteBuf buf = Unpooled.buffer(4);
@@ -271,7 +273,7 @@ class FlatBuffersEncoderTest {
         HttpResponse httpResponse = new HttpResponse(0L);
         // only set header, without setStatusCode, use 200 as default
         httpResponse.setHeader("Foo", "Bar");
-        ByteBuffer result = flatBuffersEncoder.encode(httpResponse);
+        ByteBuffer result = payloadEncoder.encode(httpResponse);
         result.position(4);
         io.github.api7.A6.HTTPReqCall.Resp resp = 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(result);
         Assertions.assertEquals(resp.actionType(), Action.Stop);
diff --git 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
index 34792e8..0f33d68 100644
--- 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
+++ 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
@@ -22,6 +22,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.flatbuffers.FlatBufferBuilder;
 import io.github.api7.A6.PrepareConf.Req;
 import io.github.api7.A6.TextEntry;
+import io.netty.channel.embedded.EmbeddedChannel;
+
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
@@ -33,9 +35,9 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -46,7 +48,7 @@ class A6ConfigHandlerTest {
 
     Map<String, PluginFilter> filters;
 
-    A6ConfigHandler a6ConfigHandler;
+    PrepareConfHandler prepareConfHandler;
 
     @BeforeEach
     void setUp() {
@@ -58,8 +60,18 @@ class A6ConfigHandlerTest {
             }
 
             @Override
-            public Mono<Void> filter(HttpRequest request, HttpResponse 
response, PluginFilterChain chain) {
-                return chain.filter(request, response);
+            public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
+                chain.filter(request, response);
+            }
+
+            @Override
+            public List<String> requiredVars() {
+                return null;
+            }
+
+            @Override
+            public Boolean requiredBody() {
+                return null;
             }
         });
 
@@ -70,12 +82,22 @@ class A6ConfigHandlerTest {
             }
 
             @Override
-            public Mono<Void> filter(HttpRequest request, HttpResponse 
response, PluginFilterChain chain) {
-                return chain.filter(request, response);
+            public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
+                chain.filter(request, response);
+            }
+
+            @Override
+            public List<String> requiredVars() {
+                return null;
+            }
+
+            @Override
+            public Boolean requiredBody() {
+                return null;
             }
         });
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, 
TimeUnit.SECONDS).maximumSize(1000).build();
-        a6ConfigHandler = new A6ConfigHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters);
     }
 
     @Test
@@ -92,10 +114,12 @@ class A6ConfigHandlerTest {
         Req req = Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        A6ConfigResponse response = new A6ConfigResponse(0L);
-        a6ConfigHandler.handle(request, response);
+        EmbeddedChannel channel = new EmbeddedChannel(new 
BinaryProtocolDecoder(), prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
 
-        A6Conf config = cache.getIfPresent(0L);
+        A6Conf config = cache.getIfPresent(response.getConfToken());
         Assertions.assertNotNull(config.getChain());
         Assertions.assertEquals(config.getChain().getFilters().size(), 1);
         Assertions.assertEquals(config.getChain().getIndex(), 0);
@@ -121,10 +145,12 @@ class A6ConfigHandlerTest {
         Req req = Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        A6ConfigResponse response = new A6ConfigResponse(0L);
-        a6ConfigHandler.handle(request, response);
+        EmbeddedChannel channel = new EmbeddedChannel(new 
BinaryProtocolDecoder(), prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
 
-        A6Conf config = cache.getIfPresent(0L);
+        A6Conf config = cache.getIfPresent(response.getConfToken());
         Assertions.assertEquals(config.getChain().getFilters().size(), 2);
     }
 
@@ -147,10 +173,12 @@ class A6ConfigHandlerTest {
         Req req = Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        A6ConfigResponse response = new A6ConfigResponse(0L);
-        a6ConfigHandler.handle(request, response);
+        EmbeddedChannel channel = new EmbeddedChannel(new 
BinaryProtocolDecoder(), prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
 
-        A6Conf config = cache.getIfPresent(0L);
+        A6Conf config = cache.getIfPresent(response.getConfToken());
         Assertions.assertEquals(config.getChain().getFilters().size(), 1);
     }
 
@@ -169,10 +197,12 @@ class A6ConfigHandlerTest {
         Req req = Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        A6ConfigResponse response = new A6ConfigResponse(0L);
-        a6ConfigHandler.handle(request, response);
+        EmbeddedChannel channel = new EmbeddedChannel(new 
BinaryProtocolDecoder(), prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
 
-        A6Conf config = cache.getIfPresent(0L);
+        A6Conf config = cache.getIfPresent(response.getConfToken());
         Assertions.assertEquals(config.getChain().getFilters().size(), 0);
     }
 
@@ -190,10 +220,12 @@ class A6ConfigHandlerTest {
         Req req = Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        A6ConfigResponse response = new A6ConfigResponse(0L);
-        a6ConfigHandler.handle(request, response);
+        EmbeddedChannel channel = new EmbeddedChannel(new 
BinaryProtocolDecoder(), prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
 
-        A6Conf a6Conf = cache.getIfPresent(0L);
+        A6Conf a6Conf = cache.getIfPresent(response.getConfToken());
         Assertions.assertTrue(a6Conf.getConfig() instanceof HashMap);
         for (int i = 0; i < 100; i++) {
             Assertions.assertEquals(a6Conf.get("FooFilter"), "Bar");
diff --git 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
index 9ead677..8a449d5 100644
--- 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
+++ 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
@@ -24,9 +24,11 @@ import com.google.gson.Gson;
 import io.github.api7.A6.Err.Code;
 import io.github.api7.A6.HTTPReqCall.Action;
 import io.github.api7.A6.TextEntry;
+import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ErrResponse;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
@@ -36,11 +38,11 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -50,12 +52,18 @@ class A6HttpCallHandlerTest {
     private PrintStream console = null;
     private ByteArrayOutputStream bytes = null;
 
-    A6HttpCallHandler a6HttpCallHandler;
+    HTTPReqCallHandler httpReqCallHandler;
 
     Cache<Long, A6Conf> cache;
 
     Map<String, PluginFilter> filters;
 
+    EmbeddedChannel channel;
+
+    PrepareConfHandler prepareConfHandler;
+
+    long confToken;
+
     @BeforeEach
     void setUp() {
 
@@ -72,7 +80,7 @@ class A6HttpCallHandlerTest {
 
             @Override
             @SuppressWarnings("unchecked")
-            public Mono<Void> filter(HttpRequest request, HttpResponse 
response, PluginFilterChain chain) {
+            public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
                 System.out.println("do filter: FooFilter, order: " + 
chain.getIndex());
                 System.out.println("do filter: FooFilter, config: " + 
request.getConfig(this));
                 Gson gson = new Gson();
@@ -102,9 +110,18 @@ class A6HttpCallHandlerTest {
                     System.out.println("do filter: method: " + 
request.getMethod());
                 }
 
-                return chain.filter(request, response);
+                chain.filter(request, response);
+            }
+
+            @Override
+            public List<String> requiredVars() {
+                return null;
             }
 
+            @Override
+            public Boolean requiredBody() {
+                return null;
+            }
         });
 
         filters.put("CatFilter", new PluginFilter() {
@@ -114,14 +131,23 @@ class A6HttpCallHandlerTest {
             }
 
             @Override
-            public Mono<Void> filter(HttpRequest request, HttpResponse 
response, PluginFilterChain chain) {
+            public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
                 System.out.println("do filter: CatFilter, order: " + 
chain.getIndex());
                 System.out.println("do filter: CatFilter, config: " + 
request.getConfig(this));
 
                 response.setStatusCode(401);
-                return chain.filter(request, response);
+                chain.filter(request, response);
             }
 
+            @Override
+            public List<String> requiredVars() {
+                return null;
+            }
+
+            @Override
+            public Boolean requiredBody() {
+                return null;
+            }
         });
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, 
TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -141,12 +167,16 @@ class A6HttpCallHandlerTest {
         io.github.api7.A6.PrepareConf.Req req = 
io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        A6ConfigResponse response = new A6ConfigResponse(0L);
-
-        A6ConfigHandler a6ConfigHandler = new A6ConfigHandler(cache, filters);
-        a6ConfigHandler.handle(request, response);
-
-        a6HttpCallHandler = new A6HttpCallHandler(cache);
+        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        channel = new EmbeddedChannel(new BinaryProtocolDecoder(), 
prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
+        confToken = response.getConfToken();
+
+        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        httpReqCallHandler = new HTTPReqCallHandler(cache);
+        channel = new EmbeddedChannel(new BinaryProtocolDecoder(), 
prepareConfHandler, httpReqCallHandler);
     }
 
     @AfterEach
@@ -165,9 +195,10 @@ class A6HttpCallHandlerTest {
 
         io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
         HttpRequest request = new HttpRequest(req);
-        HttpResponse response = new HttpResponse(1L);
-        a6HttpCallHandler.handle(request, response);
-        io.github.api7.A6.Err.Resp err = 
io.github.api7.A6.Err.Resp.getRootAsResp(response.getErrResponse().encode());
+        channel.writeInbound(request);
+        channel.finish();
+        A6ErrResponse response = channel.readOutbound();
+        io.github.api7.A6.Err.Resp err = 
io.github.api7.A6.Err.Resp.getRootAsResp(response.encode());
         Assertions.assertEquals(err.code(), Code.CONF_TOKEN_NOT_FOUND);
     }
 
@@ -177,13 +208,14 @@ class A6HttpCallHandlerTest {
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
         io.github.api7.A6.HTTPReqCall.Req.startReq(builder);
-        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, 0L);
+        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, confToken);
         builder.finish(io.github.api7.A6.HTTPReqCall.Req.endReq(builder));
 
         io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
         HttpRequest request = new HttpRequest(req);
-        HttpResponse response = new HttpResponse(1L);
-        a6HttpCallHandler.handle(request, response);
+        channel.writeInbound(request);
+        channel.finish();
+
         Assertions.assertTrue(bytes.toString().contains("do filter: FooFilter, 
order: 1"));
         Assertions.assertTrue(bytes.toString().contains("do filter: FooFilter, 
order: 1"));
         Assertions.assertTrue(bytes.toString().contains("do filter: FooFilter, 
config: {\"conf_key1\":\"conf_value1\",\"conf_key2\":2}"));
@@ -213,17 +245,17 @@ class A6HttpCallHandlerTest {
 
         io.github.api7.A6.HTTPReqCall.Req.startReq(builder);
         io.github.api7.A6.HTTPReqCall.Req.addId(builder, 8888L);
-        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, 0L);
+        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, confToken);
         io.github.api7.A6.HTTPReqCall.Req.addMethod(builder, 
io.github.api7.A6.Method.GET);
         io.github.api7.A6.HTTPReqCall.Req.addHeaders(builder, headerVector);
         io.github.api7.A6.HTTPReqCall.Req.addPath(builder, path);
         io.github.api7.A6.HTTPReqCall.Req.addArgs(builder, argsVector);
         builder.finish(io.github.api7.A6.HTTPReqCall.Req.endReq(builder));
         io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
-
         HttpRequest request = new HttpRequest(req);
-        HttpResponse response = new HttpResponse(1L);
-        a6HttpCallHandler.handle(request, response);
+        channel.writeInbound(request);
+        channel.finish();
+
         Assertions.assertTrue(bytes.toString().contains("do filter: FooFilter, 
order: 1"));
         Assertions.assertTrue(bytes.toString().contains("do filter: FooFilter, 
config: {\"conf_key1\":\"conf_value1\",\"conf_key2\":2}"));
 
@@ -244,16 +276,19 @@ class A6HttpCallHandlerTest {
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
         io.github.api7.A6.HTTPReqCall.Req.startReq(builder);
-        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, 0L);
+        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, confToken);
         builder.finish(io.github.api7.A6.HTTPReqCall.Req.endReq(builder));
 
         io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
         HttpRequest request = new HttpRequest(req);
-        HttpResponse response = new HttpResponse(1L);
-        a6HttpCallHandler.handle(request, response);
+        channel.writeInbound(request);
+        channel.finish();
+        HttpResponse response = channel.readOutbound();
 
         io.github.api7.A6.HTTPReqCall.Resp resp =
                 
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(response.encode());
         Assertions.assertEquals(resp.actionType(), Action.Stop);
     }
+
+    //TODO add test cases about fetch nginx vars and request body
 }
diff --git 
a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
new file mode 100644
index 0000000..51daa05
--- /dev/null
+++ 
b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.flatbuffers.FlatBufferBuilder;
+import io.github.api7.A6.ExtraInfo.Resp;
+import io.github.api7.A6.TextEntry;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigRequest;
+import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.ExtraInfoRequest;
+import org.apache.apisix.plugin.runner.ExtraInfoResponse;
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.HttpResponse;
+import org.apache.apisix.plugin.runner.filter.PluginFilter;
+import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@DisplayName("test extra info")
+class ExtraInfoTest {
+    private PrintStream console = null;
+    private ByteArrayOutputStream bytes = null;
+
+    private static final String[] EXTRAINFO_VARS = new String[]{"remote_addr", 
"server_port", "content_type"};
+
+    HTTPReqCallHandler httpReqCallHandler;
+
+    Cache<Long, A6Conf> cache;
+
+    Map<String, PluginFilter> filters;
+
+    EmbeddedChannel channel;
+
+    PrepareConfHandler prepareConfHandler;
+
+    long confToken;
+
+    @BeforeEach
+    void setUp() {
+
+        bytes = new ByteArrayOutputStream();
+        console = System.out;
+        System.setOut(new PrintStream(bytes));
+
+        filters = new HashMap<>();
+        filters.put("FooFilter", new PluginFilter() {
+            @Override
+            public String name() {
+                return "FooFilter";
+            }
+
+            @Override
+            @SuppressWarnings("unchecked")
+            public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
+                String remote_addr = request.getVars("remote_addr");
+                String server_port = request.getVars("server_port");
+                System.out.println("remote_addr: " + remote_addr);
+                System.out.println("server_port: " + server_port);
+                chain.filter(request, response);
+            }
+
+            @Override
+            public List<String> requiredVars() {
+                return List.of("remote_addr", "server_port");
+            }
+
+            @Override
+            public Boolean requiredBody() {
+                return false;
+            }
+        });
+
+        filters.put("CatFilter", new PluginFilter() {
+            @Override
+            public String name() {
+                return "CatFilter";
+            }
+
+            @Override
+            public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
+                String body = request.getBody();
+                String content_type = request.getVars("content_type");
+                System.out.println("content_type: " + content_type);
+                System.out.println("body: " + body);
+                chain.filter(request, response);
+            }
+
+            @Override
+            public List<String> requiredVars() {
+                return List.of("content_type");
+            }
+
+            @Override
+            public Boolean requiredBody() {
+                return true;
+            }
+        });
+        cache = CacheBuilder.newBuilder().expireAfterWrite(3600, 
TimeUnit.SECONDS).maximumSize(1000).build();
+        FlatBufferBuilder builder = new FlatBufferBuilder();
+
+        int foo = builder.createString("FooFilter");
+        int bar = 
builder.createString("{\"conf_key1\":\"conf_value1\",\"conf_key2\":2}");
+        int filter1 = TextEntry.createTextEntry(builder, foo, bar);
+
+        int cat = builder.createString("CatFilter");
+        int dog = builder.createString("Dog");
+        int filter2 = TextEntry.createTextEntry(builder, cat, dog);
+
+        int confVector = 
io.github.api7.A6.PrepareConf.Req.createConfVector(builder, new int[]{filter1, 
filter2});
+        io.github.api7.A6.PrepareConf.Req.startReq(builder);
+        io.github.api7.A6.PrepareConf.Req.addConf(builder, confVector);
+        builder.finish(io.github.api7.A6.PrepareConf.Req.endReq(builder));
+        io.github.api7.A6.PrepareConf.Req req = 
io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
+
+        A6ConfigRequest request = new A6ConfigRequest(req);
+        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        channel = new EmbeddedChannel(new BinaryProtocolDecoder(), 
prepareConfHandler);
+        channel.writeInbound(request);
+        channel.finish();
+        A6ConfigResponse response = channel.readOutbound();
+        confToken = response.getConfToken();
+
+        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        httpReqCallHandler = new HTTPReqCallHandler(cache);
+        channel = new EmbeddedChannel(new BinaryProtocolDecoder(), 
prepareConfHandler, httpReqCallHandler);
+    }
+
+    @AfterEach
+    void setDown() {
+        System.setOut(console);
+    }
+
+    @Test
+    @DisplayName("test fetch nginx vars of extra info")
+    void testFetchVars() {
+        FlatBufferBuilder builder = new FlatBufferBuilder();
+
+        io.github.api7.A6.HTTPReqCall.Req.startReq(builder);
+        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, confToken);
+        builder.finish(io.github.api7.A6.HTTPReqCall.Req.endReq(builder));
+
+        io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
+        HttpRequest request = new HttpRequest(req);
+        channel.writeInbound(request);
+        channel.finish();
+        ExtraInfoRequest eir1 = channel.readOutbound();
+        ExtraInfoRequest eir2 = channel.readOutbound();
+        ExtraInfoRequest eir3 = channel.readOutbound();
+        assertThat(EXTRAINFO_VARS).contains((String) 
ReflectionTestUtils.getField(eir1, "var"));
+        assertThat(EXTRAINFO_VARS).contains((String) 
ReflectionTestUtils.getField(eir2, "var"));
+        assertThat(EXTRAINFO_VARS).contains((String) 
ReflectionTestUtils.getField(eir3, "var"));
+    }
+
+    @Test
+    @DisplayName("test fetch request body of extra info")
+    void testFetchBody() {
+        FlatBufferBuilder builder = new FlatBufferBuilder();
+
+        io.github.api7.A6.HTTPReqCall.Req.startReq(builder);
+        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, confToken);
+        builder.finish(io.github.api7.A6.HTTPReqCall.Req.endReq(builder));
+
+        io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
+        HttpRequest request = new HttpRequest(req);
+        channel.writeInbound(request);
+        channel.finish();
+        channel.readOutbound();
+        channel.readOutbound();
+        channel.readOutbound();
+        ExtraInfoRequest exr = channel.readOutbound();
+        Assertions.assertEquals(true, ReflectionTestUtils.getField(exr, 
"reqBody"));
+    }
+
+    @Test
+    @DisplayName("test fetch request body of extra info")
+    void testGetVarsInPluginFilter() {
+        FlatBufferBuilder builder = new FlatBufferBuilder();
+
+        io.github.api7.A6.HTTPReqCall.Req.startReq(builder);
+        io.github.api7.A6.HTTPReqCall.Req.addConfToken(builder, confToken);
+        builder.finish(io.github.api7.A6.HTTPReqCall.Req.endReq(builder));
+
+        io.github.api7.A6.HTTPReqCall.Req req = 
io.github.api7.A6.HTTPReqCall.Req.getRootAsReq(builder.dataBuffer());
+        HttpRequest request = new HttpRequest(req);
+        channel.writeInbound(request);
+        channel.flushInbound();
+
+        for (int i = 0; i < 4; i++) {
+            ExtraInfoRequest eir = channel.readOutbound();
+            if (ReflectionTestUtils.getField(eir, "var") != null && 
"remote_addr" == ReflectionTestUtils.getField(eir, "var")) {
+                builder.clear();
+                int res1 = builder.createString("127.0.0.1");
+                io.github.api7.A6.ExtraInfo.Resp.startResp(builder);
+                io.github.api7.A6.ExtraInfo.Resp.addResult(builder, res1);
+                int resp1 = Resp.endResp(builder);
+                builder.finish(resp1);
+                ExtraInfoResponse extraInfoResponse1 = 
ExtraInfoResponse.from(builder.dataBuffer());
+                channel.writeInbound(extraInfoResponse1);
+                channel.flushInbound();
+                continue;
+            }
+
+            if (ReflectionTestUtils.getField(eir, "var") != null && 
"server_port" == ReflectionTestUtils.getField(eir, "var")) {
+                builder.clear();
+                int res2 = builder.createString("9080");
+                io.github.api7.A6.ExtraInfo.Resp.startResp(builder);
+                io.github.api7.A6.ExtraInfo.Resp.addResult(builder, res2);
+                int resp2 = Resp.endResp(builder);
+                builder.finish(resp2);
+                ExtraInfoResponse extraInfoResponse2 = 
ExtraInfoResponse.from(builder.dataBuffer());
+                channel.writeInbound(extraInfoResponse2);
+                channel.flushInbound();
+                continue;
+            }
+
+            if (ReflectionTestUtils.getField(eir, "var") != null && 
"content_type" == ReflectionTestUtils.getField(eir, "var")) {
+                builder.clear();
+                int res3 = builder.createString("application/json");
+                io.github.api7.A6.ExtraInfo.Resp.startResp(builder);
+                io.github.api7.A6.ExtraInfo.Resp.addResult(builder, res3);
+                int resp3 = Resp.endResp(builder);
+                builder.finish(resp3);
+                ExtraInfoResponse extraInfoResponse3 = 
ExtraInfoResponse.from(builder.dataBuffer());
+                channel.writeInbound(extraInfoResponse3);
+                channel.flushInbound();
+                continue;
+            }
+
+            if ((ReflectionTestUtils.getField(eir, "reqBody") == null) || 
!((Boolean) ReflectionTestUtils.getField(eir, "reqBody"))) {
+                continue;
+            }
+            builder.clear();
+            int res4 = builder.createString("abcd");
+            io.github.api7.A6.ExtraInfo.Resp.startResp(builder);
+            io.github.api7.A6.ExtraInfo.Resp.addResult(builder, res4);
+            int resp4 = Resp.endResp(builder);
+            builder.finish(resp4);
+            ExtraInfoResponse extraInfoResponse4 = 
ExtraInfoResponse.from(builder.dataBuffer());
+            channel.writeInbound(extraInfoResponse4);
+            channel.flushInbound();
+        }
+
+        Assertions.assertTrue(bytes.toString().contains("remote_addr: 
127.0.0.1"));
+        Assertions.assertTrue(bytes.toString().contains("server_port: 9080"));
+        Assertions.assertTrue(bytes.toString().contains("content_type: 
application/json"));
+        Assertions.assertTrue(bytes.toString().contains("body: abcd"));
+    }
+
+}
diff --git a/runner-dist/apisix-runner-bin-dist/src/main/release-docs/LICENSE 
b/runner-dist/apisix-runner-bin-dist/src/main/release-docs/LICENSE
index c3e6dd6..dbbde9b 100644
--- a/runner-dist/apisix-runner-bin-dist/src/main/release-docs/LICENSE
+++ b/runner-dist/apisix-runner-bin-dist/src/main/release-docs/LICENSE
@@ -225,8 +225,7 @@ The text of each license is the standard Apache 2.0 license.
    listenablefuture 
9999.0-empty-to-avoid-conflict-with-guava:https://github.com/google/guava, 
Apache 2.0
    jsr305 3.0.2: http://findbugs.sourceforge.net, Apache 2.0
    gson 2.8.6: https://github.com/google/gson, Apache 2.0
-   reactor-core 3.4.5: https://github.com/reactor/reactor-core, Apache 2.0
-   reactor-netty 1.0.6: https://github.com/reactor/reactor-netty, Apache 2.0
+   netty 4.1.63.Final: https://github.com/netty/netty, Apache 2.0
    netty 4.1.63: https://github.com/netty/netty, Apache 2.0
    log4j2 2.6.2: https://github.com/apache/logging-log4j2, Apache 2.0
    slf4j-api 1.7.28: http://www.slf4j.org, Apache 2.0
diff --git a/runner-plugin-sdk/pom.xml b/runner-plugin-sdk/pom.xml
index 3a620de..feef4c8 100644
--- a/runner-plugin-sdk/pom.xml
+++ b/runner-plugin-sdk/pom.xml
@@ -36,7 +36,7 @@
         <dependency>
             <groupId>io.github.api7</groupId>
             <artifactId>A6</artifactId>
-            <version>0.1.0-RELEASE</version>
+            <version>0.3.0-RELEASE</version>
         </dependency>
 
         <dependency>
@@ -60,15 +60,5 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-reactor-netty</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java
 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java
new file mode 100644
index 0000000..eb1954d
--- /dev/null
+++ 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+import io.github.api7.A6.ExtraInfo.Info;
+import io.github.api7.A6.ExtraInfo.ReqBody;
+import io.github.api7.A6.PrepareConf.Req;
+
+import java.nio.ByteBuffer;
+
+public class ExtraInfoRequest implements A6Response {
+
+    private final String var;
+
+    private final Boolean reqBody;
+
+    public ExtraInfoRequest(String var, Boolean reqBody) {
+        this.var = var;
+        this.reqBody = reqBody;
+    }
+
+    @Override
+    public ByteBuffer encode() {
+        FlatBufferBuilder builder = new FlatBufferBuilder();
+
+        if (var != null) {
+            int nameOffset = builder.createString(var);
+            io.github.api7.A6.ExtraInfo.Var.startVar(builder);
+            io.github.api7.A6.ExtraInfo.Var.addName(builder, nameOffset);
+            int endVar = io.github.api7.A6.ExtraInfo.Var.endVar(builder);
+            buildExtraInfo(endVar, Info.Var, builder);
+        }
+
+        if (this.reqBody != null && this.reqBody) {
+            io.github.api7.A6.ExtraInfo.ReqBody.startReqBody(builder);
+            int reqBodyReq = ReqBody.endReqBody(builder);
+            buildExtraInfo(reqBodyReq, Info.ReqBody, builder);
+        }
+
+        builder.finish(Req.endReq(builder));
+        return builder.dataBuffer();
+    }
+
+    private void buildExtraInfo(int info, byte type, FlatBufferBuilder 
builder) {
+        io.github.api7.A6.ExtraInfo.Req.startReq(builder);
+        io.github.api7.A6.ExtraInfo.Req.addInfoType(builder, type);
+        io.github.api7.A6.ExtraInfo.Req.addInfo(builder, info);
+    }
+
+    @Override
+    public byte getType() {
+        return 3;
+    }
+}
diff --git 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoResponse.java
similarity index 52%
rename from 
runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
rename to 
runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoResponse.java
index b79093a..eb10f97 100644
--- 
a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
+++ 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoResponse.java
@@ -15,14 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner;
 
-import org.apache.apisix.plugin.runner.A6Response;
+import io.github.api7.A6.ExtraInfo.Resp;
+import lombok.Getter;
 
 import java.nio.ByteBuffer;
 
-@FunctionalInterface
-public interface PluginRunnerEncoder {
-    
-    ByteBuffer encode(A6Response response);
+public class ExtraInfoResponse implements A6Request {
+    @Getter
+    private final Resp resp;
+
+    public ExtraInfoResponse(Resp resp) {
+        this.resp = resp;
+    }
+
+    public static ExtraInfoResponse from(ByteBuffer buffer) {
+        Resp req = Resp.getRootAsResp(buffer);
+        return new ExtraInfoResponse(req);
+    }
+
+    public String getResult() {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < this.resp.resultLength(); i++) {
+            builder.append((char) this.resp.result(i));
+        }
+        return builder.toString();
+    }
+
+    @Override
+    public byte getType() {
+        return 3;
+    }
 }
diff --git 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
index 16c4881..38d0212 100644
--- 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
+++ 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
@@ -47,6 +47,10 @@ public class HttpRequest implements A6Request {
 
     private Map<String, String> args;
 
+    private Map<String, String> vars;
+
+    private String body;
+
     public HttpRequest(Req req) {
         this.req = req;
     }
@@ -128,6 +132,7 @@ public class HttpRequest implements A6Request {
      * request.getHeaders()
      * }
      * </pre>
+     *
      * @return the all headers
      */
     public Map<String, String> getHeader() {
@@ -184,6 +189,7 @@ public class HttpRequest implements A6Request {
      * request.setHeader("Accept", null);
      * }
      * </pre>
+     *
      * @param headerKey   the header key
      * @param headerValue the header value
      */
@@ -252,7 +258,8 @@ public class HttpRequest implements A6Request {
      * request.setArg("foo", null);
      * }
      * </pre>
-     * @param argKey the arg key
+     *
+     * @param argKey   the arg key
      * @param argValue the arg value
      */
     public void setArg(String argKey, String argValue) {
@@ -278,6 +285,25 @@ public class HttpRequest implements A6Request {
         return 2;
     }
 
+    public String getVars(String key) {
+        if (CollectionUtils.isEmpty(vars)) {
+            return null;
+        }
+        return vars.get(key);
+    }
+
+    public void setVars(Map<String, String> vars) {
+        this.vars = vars;
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public void setBody(String body) {
+        this.body = body;
+    }
+
     public enum Method {
         GET,
         HEAD,
diff --git 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
index 409e955..9aa2e62 100644
--- 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
+++ 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
@@ -58,8 +58,6 @@ public class HttpResponse implements A6Response {
 
     private Integer statusCode;
 
-    private A6ErrResponse errResponse;
-
     public HttpResponse(long requestId) {
         this.requestId = requestId;
     }
@@ -134,20 +132,8 @@ public class HttpResponse implements A6Response {
         this.statusCode = statusCode;
     }
 
-    public void setErrResponse(A6ErrResponse errResponse) {
-        this.errResponse = errResponse;
-    }
-
-    @Override
-    public A6ErrResponse getErrResponse() {
-        return this.errResponse;
-    }
-
     @Override
     public ByteBuffer encode() {
-        if (!Objects.isNull(errResponse)) {
-            return errResponse.encode();
-        }
 
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
diff --git 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
index 22f1622..e0f4908 100644
--- 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
+++ 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
@@ -19,12 +19,34 @@ package org.apache.apisix.plugin.runner.filter;
 
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
-import reactor.core.publisher.Mono;
+import java.util.List;
 
 public interface PluginFilter {
 
+    /**
+     * @return the name of plugin filter
+     */
     String name();
 
-    Mono<Void> filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain);
+    /**
+     * do the plugin filter chain
+     *  @param request the request form APISIX
+     * @param response the response for APISIX
+     * @param chain the chain of filters
+     */
+    void filter(HttpRequest request, HttpResponse response, PluginFilterChain 
chain);
 
+    /**
+     * declare in advance the nginx variables that you want to use in the 
plugin
+     * @return the nginx variables as list
+     */
+    List<String> requiredVars();
+
+    /**
+     * need request body in plugins or not
+     *
+     * @return true if need request body
+     */
+    Boolean requiredBody();
 }
+
diff --git 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java
 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java
index 0b15967..e33d0a9 100644
--- 
a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java
+++ 
b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java
@@ -19,7 +19,6 @@ package org.apache.apisix.plugin.runner.filter;
 
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
-import reactor.core.publisher.Mono;
 
 import java.util.List;
 
@@ -46,14 +45,15 @@ public class PluginFilterChain {
         return filters;
     }
 
-    public Mono<Void> filter(HttpRequest request, HttpResponse response) {
+    public void filter(HttpRequest request, HttpResponse response) {
         if (this.index < filters.size()) {
             PluginFilter filter = filters.get(this.index);
             PluginFilterChain next = new PluginFilterChain(this,
                     this.index + 1);
-            return filter.filter(request, response, next);
+            filter.filter(request, response, next);
         } else {
-            return Mono.empty();
+            //TODO log error
+            return;
         }
     }
 }
diff --git a/runner-plugin/pom.xml b/runner-plugin/pom.xml
index 111da58..0a173dc 100644
--- a/runner-plugin/pom.xml
+++ b/runner-plugin/pom.xml
@@ -45,9 +45,5 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-core</artifactId>
-        </dependency>
     </dependencies>
 </project>
diff --git 
a/runner-starter/src/test/java/org/apache/apisix/plugin/runner/PluginRunnerApplicationTest.java
 
b/runner-starter/src/test/java/org/apache/apisix/plugin/runner/PluginRunnerApplicationTest.java
deleted file mode 100644
index 22ade7f..0000000
--- 
a/runner-starter/src/test/java/org/apache/apisix/plugin/runner/PluginRunnerApplicationTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.apisix.plugin.runner;
-
-import io.netty.channel.unix.DomainSocketAddress;
-import io.netty.channel.unix.DomainSocketChannel;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.netty.Connection;
-import reactor.netty.tcp.TcpClient;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-@DisplayName("Unix Domain Socket Listen Test")
-class PluginRunnerApplicationTest {
-
-    @BeforeEach
-    void setUp() {
-        System.setProperty("APISIX_LISTEN_ADDRESS", "unix:/tmp/runner.sock");
-        System.setProperty("APISIX_CONF_EXPIRE_TIME", String.valueOf(3600));
-    }
-
-    @Test
-    void testMain() {
-        PluginRunnerApplication.main(new String[]{"args"});
-    }
-
-    @Test
-    @DisplayName("test listen on socket file success")
-    void testUDSConnectSuccess() {
-        Connection client =
-                TcpClient.create()
-                        .remoteAddress(() -> new 
DomainSocketAddress("/tmp/runner.sock"))
-                        .connectNow();
-        assertThat(client.channel().isOpen()).isTrue();
-        assertThat(client.channel().isActive()).isTrue();
-        assertThat(client.channel().isRegistered()).isTrue();
-        assertThat(client.channel() instanceof DomainSocketChannel).isTrue();
-        
assertThat(client.channel().remoteAddress().toString()).isEqualTo("/tmp/runner.sock");
-        client.disposeNow();
-    }
-
-    @Test
-    @DisplayName("test listen on error socket file")
-    void testUDSConnectFail() {
-        Throwable exception = assertThrows(RuntimeException.class, () -> {
-            Connection client = TcpClient.create()
-                    .remoteAddress(() -> new 
DomainSocketAddress("/tmp/error.sock"))
-                    .connectNow();
-            client.disposeNow();
-        });
-        
assertThat(exception.getMessage()).isEqualTo("java.io.FileNotFoundException");
-    }
-}
-
diff --git a/sample/pom.xml b/sample/pom.xml
index 466d756..e01e7e7 100644
--- a/sample/pom.xml
+++ b/sample/pom.xml
@@ -46,10 +46,6 @@
             <artifactId>spring-context</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
             <scope>test</scope>
diff --git 
a/sample/src/main/java/org/apache/apisix/plugin/runner/filter/RewriteRequestDemoFilter.java
 
b/sample/src/main/java/org/apache/apisix/plugin/runner/filter/RewriteRequestDemoFilter.java
index 406e605..e0027f0 100644
--- 
a/sample/src/main/java/org/apache/apisix/plugin/runner/filter/RewriteRequestDemoFilter.java
+++ 
b/sample/src/main/java/org/apache/apisix/plugin/runner/filter/RewriteRequestDemoFilter.java
@@ -21,9 +21,10 @@ import com.google.gson.Gson;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
 import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 @Component
@@ -59,7 +60,7 @@ public class RewriteRequestDemoFilter implements PluginFilter 
{
     }
 
     @Override
-    public Mono<Void> filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
+    public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
         /*
          * If the conf you configured is of type json, you can convert it to 
Map or json.
          */
@@ -86,6 +87,33 @@ public class RewriteRequestDemoFilter implements 
PluginFilter {
          */
         request.setArg((String) conf.get("conf_arg_name"), (String) 
conf.get("conf_arg_value"));
 
-        return chain.filter(request, response);
+        /*
+         * You can fetch the Nginx variables, and the request body
+         */
+        String remoteAddr = request.getVars("remote_addr");
+        String serverPort = request.getVars("server_port");
+        String body = request.getBody();
+
+        chain.filter(request, response);
+    }
+
+    /**
+     * If you need to fetch some Nginx variables in the current plugin, you 
will need to declare them in this function.
+     * @return a list of Nginx variables that need to be called in this plugin
+     */
+    @Override
+    public List<String> requiredVars() {
+        List<String> vars = new ArrayList<>();
+        vars.add("remote_addr");
+        vars.add("server_port");
+        return vars;
+    }
+
+    /**
+     * If you need to fetch request body in the current plugin, you will need 
to return true in this function.
+     */
+    @Override
+    public Boolean requiredBody() {
+        return true;
     }
 }
diff --git 
a/sample/src/main/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilter.java
 
b/sample/src/main/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilter.java
index 706e573..7fc7116 100644
--- 
a/sample/src/main/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilter.java
+++ 
b/sample/src/main/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilter.java
@@ -21,9 +21,9 @@ import com.google.gson.Gson;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
 import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 @Component
@@ -34,7 +34,7 @@ public class StopRequestDemoFilter implements PluginFilter {
     }
 
     @Override
-    public Mono<Void> filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
+    public void filter(HttpRequest request, HttpResponse response, 
PluginFilterChain chain) {
         /*
          * If the conf you configured is of type json, you can convert it to 
Map or json.
          */
@@ -71,6 +71,16 @@ public class StopRequestDemoFilter implements PluginFilter {
             body:
             {"key1":"value1","key2":2}
          */
-        return chain.filter(request, response);
+        chain.filter(request, response);
+    }
+
+    @Override
+    public List<String> requiredVars() {
+        return null;
+    }
+
+    @Override
+    public Boolean requiredBody() {
+        return null;
     }
 }
diff --git 
a/sample/src/test/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilterTest.java
 
b/sample/src/test/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilterTest.java
index 88dfbb6..5046808 100644
--- 
a/sample/src/test/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilterTest.java
+++ 
b/sample/src/test/java/org/apache/apisix/plugin/runner/filter/StopRequestDemoFilterTest.java
@@ -17,16 +17,9 @@
 
 package org.apache.apisix.plugin.runner.filter;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.util.HashMap;
 import java.util.Map;
-
 import com.google.gson.Gson;
-
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.HttpResponse;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -55,29 +48,6 @@ public class StopRequestDemoFilterTest {
     }
 
     @Test
-    @DisplayName("test filter")
-    void testFilter() {
-
-        StopRequestDemoFilter filter = new StopRequestDemoFilter();
-
-        HttpRequest request = mock(HttpRequest.class);
-        HttpResponse response = new HttpResponse(1);
-        PluginFilterChain chain = mock(PluginFilterChain.class);
-
-        String configStr = "{\"stop_response_code\": 200, 
\"stop_response_header_name\": \"header_java_runner\", 
\"stop_response_header_value\": \"via-java-runner\",  \"stop_response_body\": 
\"hellox\"}";
-        when(request.getConfig(filter)).thenReturn(configStr);
-        Assertions.assertNull(filter.filter(request, response, chain));
-
-        configStr = "{\"stop_response_code\": 200.0, 
\"stop_response_header_name\": \"header_java_runner\", 
\"stop_response_header_value\": \"via-java-runner\",  \"stop_response_body\": 
\"hellox\"}";
-        when(request.getConfig(filter)).thenReturn(configStr);
-        Assertions.assertNull(filter.filter(request, response, chain));
-
-        configStr = "{\"stop_response_code\": \"200\", 
\"stop_response_header_name\": \"header_java_runner\", 
\"stop_response_header_value\": \"via-java-runner\",  \"stop_response_body\": 
\"hellox\"}";
-        when(request.getConfig(filter)).thenReturn(configStr);
-        Assertions.assertNull(filter.filter(request, response, chain));
-    }
-
-    @Test
     @DisplayName("test name")
     void testName() {      
         StopRequestDemoFilter filter = new StopRequestDemoFilter();
diff --git a/src/main/checkstyle/checkstyle-suppressions.xml 
b/src/main/checkstyle/checkstyle-suppressions.xml
index b8ba0b4..7eadc9f 100644
--- a/src/main/checkstyle/checkstyle-suppressions.xml
+++ b/src/main/checkstyle/checkstyle-suppressions.xml
@@ -24,5 +24,11 @@
 <suppressions>
     <suppress checks="RegexpSingleline"
               files="A6HttpCallHandlerTest.java"
-              lines="75-120"/>
+              lines="1-1000"/>
+    <suppress checks="RegexpSingleline"
+              files="ExtraInfoTest.java"
+              lines="1-1000"/>
+    <suppress checks="LocalVariableName"
+              files="ExtraInfoTest.java"
+              lines="1-1000"/>
 </suppressions>
\ No newline at end of file

Reply via email to