http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
new file mode 100644
index 0000000..7ef01db
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.api.exception;
+
+/**
+ * Handy class for wrapping runtime {@code Exceptions} with a root cause.
+ *
+ * <p>This class is {@code abstract} to force the programmer to extend
+ * the class. {@code getMessage} will include nested exception
+ * information; {@code getRootCause} will include the innermost cause of
+ * this exception, if any; {@code printStackTrace} and other like methods will
+ * delegate to the wrapped exception, if any.
+ *
+ * @since 1.0.0
+ */
+public abstract class NestedRuntimeException extends RuntimeException {
+    private static final long serialVersionUID = -8371779880133933367L;
+
+    /**
+     * Construct a {@code NestedRuntimeException} with the specified detail 
message.
+     *
+     * @param msg the detail message
+     */
+    public NestedRuntimeException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Construct a {@code NestedRuntimeException} with the specified detail 
message
+     * and nested exception.
+     *
+     * @param msg the detail message
+     * @param cause the nested exception
+     */
+    public NestedRuntimeException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+    /**
+     * Build a message for the given base message and root cause.
+     *
+     * @param message the base message
+     * @param cause the root cause
+     * @return the full exception message
+     */
+    private static String getMessageWithCause(String message, Throwable cause) 
{
+        if (cause != null) {
+            StringBuilder sb = new StringBuilder();
+            if (message != null) {
+                sb.append(message).append("; ");
+            }
+            sb.append("nested exception is ").append(cause);
+            return sb.toString();
+        } else {
+            return message;
+        }
+    }
+
+    /**
+     * Return the detail message, including the message from the nested 
exception
+     * if there is one.
+     */
+    @Override
+    public String getMessage() {
+        return getMessageWithCause(super.getMessage(), getCause());
+    }
+
+    /**
+     * Retrieve the innermost cause of this exception, if any.
+     *
+     * @return the innermost exception, or {@code null} if none
+     */
+    public Throwable getRootCause() {
+        Throwable rootCause = null;
+        Throwable cause = getCause();
+        while (cause != null && cause != rootCause) {
+            rootCause = cause;
+            cause = cause.getCause();
+        }
+        return rootCause;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
new file mode 100644
index 0000000..6ce6dd4
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.exception;
+
+/**
+ * Generic remote access exception. A service proxy for any remoting
+ * protocol should throw this exception or subclasses of it, in order
+ * to transparently expose a plain Java business interface.
+ *
+ * <p>A client may catch RemoteAccessException if it wants to, but as
+ * remote access errors are typically unrecoverable, it will probably let
+ * such exceptions propagate to a higher level that handles them generically.
+ * In this case, the client opCode doesn't show any signs of being involved in
+ * remote access, as there aren't any remoting-specific dependencies.
+ *
+ * @since 1.0.0
+ */
+public class RemoteAccessException extends NestedRuntimeException {
+    private static final long serialVersionUID = 6280428909532427263L;
+
+    /**
+     * Constructor for RemoteAccessException with the specified detail message.
+     *
+     * @param msg the detail message
+     */
+    public RemoteAccessException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructor for RemoteAccessException with the specified detail message
+     * and nested exception.
+     *
+     * @param msg the detail message
+     * @param cause the root cause (usually from using an underlying
+     * remoting API such as RMI)
+     */
+    public RemoteAccessException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java
new file mode 100644
index 0000000..a8b9e4e
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.exception;
+
+/**
+ * @since 1.0.0
+ */
+public class RemoteCodecException extends RemoteAccessException {
+    private static final long serialVersionUID = -7597014042746200543L;
+
+    public RemoteCodecException(String msg) {
+        super(msg);
+    }
+
+    public RemoteCodecException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java
new file mode 100644
index 0000000..af0a6e9
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.exception;
+
+/**
+ * RemoteConnectFailureException will be thrown when connection
+ * could not be established with a remote service.
+ *
+ * @since 1.0.0
+ */
+public class RemoteConnectFailureException extends RemoteAccessException {
+    private static final long serialVersionUID = -5565366231695911316L;
+
+    /**
+     * Constructor for RemoteConnectFailureException with the specified detail 
message
+     * and nested exception.
+     *
+     * @param msg the detail message
+     * @param cause the root cause from the remoting API in use
+     */
+    public RemoteConnectFailureException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+    /**
+     * Constructor for RemoteConnectFailureException with the specified detail 
message.
+     *
+     * @param msg the detail message
+     */
+    public RemoteConnectFailureException(String msg) {
+        super(msg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java
new file mode 100644
index 0000000..adfcc8d
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.api.exception;
+
+/**
+ * RemoteTimeoutException will be thrown when the execution
+ * of the target method did not complete before a configurable
+ * timeout, for example when a reply message was not received.
+ *
+ * @since 1.0.0
+ */
+public class RemoteTimeoutException extends RemoteAccessException {
+    private static final long serialVersionUID = 8710772392914461626L;
+
+    /**
+     * Constructor for RemoteTimeoutException with the specified detail 
message,configurable timeout.
+     *
+     * @param msg the detail message
+     * @param timeoutMillis configurable timeout
+     */
+    public RemoteTimeoutException(String msg, long timeoutMillis) {
+        this(msg, timeoutMillis, null);
+    }
+
+    /**
+     * Constructor for RemoteTimeoutException with the specified detail 
message,configurable timeout
+     * and nested exception..
+     *
+     * @param msg the detail message
+     * @param timeoutMillis configurable timeout
+     * @param cause Exception cause
+     */
+    public RemoteTimeoutException(String msg, long timeoutMillis, Throwable 
cause) {
+        super(String.format("%s, waiting for %s ms", msg, timeoutMillis), 
cause);
+    }
+
+    /**
+     * Constructor for RemoteTimeoutException with the specified detail 
message.
+     *
+     * @param msg the detail message
+     */
+    public RemoteTimeoutException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructor for RemoteTimeoutException with the specified detail message
+     * and nested exception.
+     *
+     * @param msg the detail message
+     * @param cause the root cause from the remoting API in use
+     */
+    public RemoteTimeoutException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
new file mode 100644
index 0000000..2452309
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.interceptor;
+
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public class ExceptionContext extends RequestContext {
+    private Throwable exception;
+    private String remark;
+
+    public ExceptionContext(RemotingEndPoint remotingEndPoint, String 
remoteAddr, RemotingCommand request,
+        Throwable exception, String remark) {
+        super(remotingEndPoint, remoteAddr, request);
+        this.remotingEndPoint = remotingEndPoint;
+        this.remoteAddr = remoteAddr;
+        this.request = request;
+        this.exception = exception;
+        this.remark = remark;
+    }
+
+    public RemotingEndPoint getRemotingEndPoint() {
+        return remotingEndPoint;
+    }
+
+    public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
+        this.remotingEndPoint = remotingEndPoint;
+    }
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+    public void setRemoteAddr(String remoteAddr) {
+        this.remoteAddr = remoteAddr;
+    }
+
+    public RemotingCommand getRequest() {
+        return request;
+    }
+
+    public void setRequest(RemotingCommand request) {
+        this.request = request;
+    }
+
+    public Throwable getException() {
+        return exception;
+    }
+
+    public void setException(Throwable exception) {
+        this.exception = exception;
+    }
+
+    public String getRemark() {
+        return remark;
+    }
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
new file mode 100644
index 0000000..62257ef
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.interceptor;
+
+public interface Interceptor {
+    void beforeRequest(final RequestContext context);
+
+    void afterResponseReceived(final ResponseContext context);
+
+    void onException(final ExceptionContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
new file mode 100644
index 0000000..9ffc696
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.interceptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InterceptorGroup {
+    private final List<Interceptor> interceptors = new 
ArrayList<Interceptor>();
+
+    public void registerInterceptor(final Interceptor interceptor) {
+        if (interceptor != null) {
+            this.interceptors.add(interceptor);
+        }
+    }
+
+    public void beforeRequest(final RequestContext context) {
+        for (Interceptor interceptor : interceptors) {
+            interceptor.beforeRequest(context);
+        }
+    }
+
+    public void afterResponseReceived(final ResponseContext context) {
+        for (Interceptor interceptor : interceptors) {
+            interceptor.afterResponseReceived(context);
+        }
+    }
+
+    public void onException(final ExceptionContext context) {
+        for (Interceptor interceptor : interceptors) {
+            interceptor.onException(context);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
new file mode 100644
index 0000000..d961556
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.interceptor;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public class RequestContext {
+    protected RemotingEndPoint remotingEndPoint;
+    protected String remoteAddr;
+    protected RemotingCommand request;
+
+    public RequestContext(RemotingEndPoint remotingEndPoint, String 
remoteAddr, RemotingCommand request) {
+        super();
+        this.remotingEndPoint = remotingEndPoint;
+        this.remoteAddr = remoteAddr;
+        this.request = request;
+    }
+
+    public RemotingEndPoint getRemotingEndPoint() {
+        return remotingEndPoint;
+    }
+
+    public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
+        this.remotingEndPoint = remotingEndPoint;
+    }
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+    public void setRemoteAddr(String remoteAddr) {
+        this.remoteAddr = remoteAddr;
+    }
+
+    public RemotingCommand getRequest() {
+        return request;
+    }
+
+    public void setRequest(RemotingCommand request) {
+        this.request = request;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.MULTI_LINE_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
new file mode 100644
index 0000000..97ec2e6
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.interceptor;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public class ResponseContext extends RequestContext {
+    private RemotingCommand response;
+
+    public ResponseContext(RemotingEndPoint remotingEndPoint, String 
remoteAddr, RemotingCommand request,
+        RemotingCommand response) {
+        super(remotingEndPoint, remoteAddr, request);
+        this.remotingEndPoint = remotingEndPoint;
+        this.remoteAddr = remoteAddr;
+        this.request = request;
+        this.response = response;
+    }
+
+    public RemotingEndPoint getRemotingEndPoint() {
+        return remotingEndPoint;
+    }
+
+    public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
+        this.remotingEndPoint = remotingEndPoint;
+    }
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+    public void setRemoteAddr(String remoteAddr) {
+        this.remoteAddr = remoteAddr;
+    }
+
+    public RemotingCommand getRequest() {
+        return request;
+    }
+
+    public void setRequest(RemotingCommand request) {
+        this.request = request;
+    }
+
+    public RemotingCommand getResponse() {
+        return response;
+    }
+
+    public void setResponse(RemotingCommand response) {
+        this.response = response;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.MULTI_LINE_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
new file mode 100644
index 0000000..5caf167
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.protocol;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+
+public interface Protocol {
+    /**
+     * Minimum Viable Protocol
+     */
+    String MVP = "mvp";
+    String HTTP2 = "http2";
+    String WEBSOCKET = "websocket";
+
+    byte MVP_MAGIC = 0x14;
+    byte WEBSOCKET_MAGIC = 0x15;
+    byte HTTP_2_MAGIC = 0x16;
+
+    String name();
+
+    byte type();
+
+    void assembleHandler(ChannelHandlerContextWrapper ctx);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
new file mode 100644
index 0000000..cf016f9
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.protocol;
+
+public interface ProtocolFactory {
+    void register(Protocol protocol);
+
+    void resetAll(Protocol protocol);
+
+    byte type(String protocolName);
+
+    Protocol get(byte type);
+
+    void clearAll();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
new file mode 100644
index 0000000..8ef8dcd
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.serializable;
+
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+
+public interface Serializer {
+    String name();
+
+    byte type();
+
+    <T> T decode(final byte[] content, final Class<T> c);
+
+    <T> T decode(final byte[] content, final TypePresentation<T> 
typePresentation);
+
+    <T> T decode(final byte[] content, final Type type);
+
+    ByteBuffer encode(final Object object);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
new file mode 100644
index 0000000..b47bf99
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.serializable;
+
+public interface SerializerFactory {
+    void register(Serializer serialization);
+
+    byte type(String serializationName);
+
+    Serializer get(byte type);
+
+    void clearAll();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
new file mode 100644
index 0000000..505e104
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+public class Pair<L, R> {
+    private L left;
+    private R right;
+
+    public Pair(L left, R right) {
+        this.left = left;
+        this.right = right;
+    }
+
+    public L getLeft() {
+        return left;
+    }
+
+    public void setLeft(L left) {
+        this.left = left;
+    }
+
+    public R getRight() {
+        return right;
+    }
+
+    public void setRight(R right) {
+        this.right = right;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java
new file mode 100644
index 0000000..ef3d5f8
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Represents a generic type {@code T}. Java doesn't yet provide a way to
+ * represent generic types, so this class does. Forces clients to create a
+ * subclass of this class which enables retrieval the type information even at
+ * runtime.
+ *
+ * <p>For example, to create a type literal for {@code List<String>}, you can
+ * create an empty anonymous inner class:
+ *
+ * <pre>
+ * TypePresentation&lt;List&lt;String&gt;&gt; list = new 
TypePresentation&lt;List&lt;String&gt;&gt;() {};
+ * </pre>
+ *
+ * To create a type literal for {@code Map<String, Integer>}:
+ *
+ * <pre>
+ * TypePresentation&lt;Map&lt;String, Integer&gt;&gt; map = new 
TypePresentation&lt;Map&lt;String, Integer&gt;&gt;() {};
+ * </pre>
+ *
+ * This syntax cannot be used to create type literals that have wildcard
+ * parameters, such as {@code Class<?>} or {@code List<? extends 
CharSequence>}.
+ *
+ * @since 1.0.0
+ */
+public class TypePresentation<T> {
+    static ConcurrentMap<Class<?>, ConcurrentMap<Type, ConcurrentMap<Type, 
Type>>> classTypeCache
+        = new ConcurrentHashMap<Class<?>, ConcurrentMap<Type, 
ConcurrentMap<Type, Type>>>(16, 0.75f, 1);
+    protected final Type type;
+
+    /**
+     * Constructs a new type literal. Derives represented class from type
+     * parameter.
+     *
+     * <p>Clients create an empty anonymous subclass. Doing so embeds the type
+     * parameter in the anonymous class's type hierarchy so we can 
reconstitute it
+     * at runtime despite erasure.
+     */
+    protected TypePresentation() {
+        Type superClass = getClass().getGenericSuperclass();
+        type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
+    }
+
+    /**
+     * @return underlying {@code Type} instance.
+     */
+    public Type getType() {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/pom.xml
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/pom.xml 
b/remoting-core/remoting-impl/pom.xml
new file mode 100644
index 0000000..53d1854
--- /dev/null
+++ b/remoting-core/remoting-impl/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>remoting-core</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>remoting-impl</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.msgpack</groupId>
+            <artifactId>msgpack</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>remoting-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
new file mode 100644
index 0000000..8af61f7
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+
+public class ChannelEventListenerGroup {
+    private final List<ChannelEventListener> listenerList = new 
ArrayList<ChannelEventListener>();
+
+    public int size() {
+        return this.listenerList.size();
+    }
+
+    public void registerChannelEventListener(final ChannelEventListener 
listener) {
+        if (listener != null) {
+            this.listenerList.add(listener);
+        }
+    }
+
+    public void onChannelConnect(final RemotingChannel channel) {
+        for (ChannelEventListener listener : listenerList) {
+            listener.onChannelConnect(channel);
+        }
+    }
+
+    public void onChannelClose(final RemotingChannel channel) {
+        for (ChannelEventListener listener : listenerList) {
+            listener.onChannelClose(channel);
+        }
+    }
+
+    public void onChannelException(final RemotingChannel channel) {
+        for (ChannelEventListener listener : listenerList) {
+            listener.onChannelException(channel);
+        }
+    }
+
+    public void onChannelIdle(final RemotingChannel channel) {
+        for (ChannelEventListener listener : listenerList) {
+            listener.onChannelIdle(channel);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
new file mode 100644
index 0000000..d5c0aaa
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.impl.protocol.Httpv2Protocol;
+import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer;
+import 
org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+
+public class RemotingCommandFactoryMeta {
+    private final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
+    private final SerializerFactory serializerFactory = new 
SerializerFactoryImpl();
+    private byte protocolType = Httpv2Protocol.MVP_MAGIC;
+    private byte serializeType = MsgPackSerializer.SERIALIZER_TYPE;
+
+    public RemotingCommandFactoryMeta() {
+    }
+
+    public RemotingCommandFactoryMeta(String protocolName, String 
serializeName) {
+        this.protocolType = protocolFactory.type(protocolName);
+        this.serializeType = serializerFactory.type(serializeName);
+    }
+
+    public byte getSerializeType() {
+        return serializeType;
+    }
+
+    public byte getProtocolType() {
+        return protocolType;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
new file mode 100644
index 0000000..2557cdf
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+
+public class ResponseResult {
+    private final long beginTimestamp = System.currentTimeMillis();
+    private final CountDownLatch countDownLatch = new CountDownLatch(1);
+    private final AtomicBoolean interceptorExecuted = new AtomicBoolean(false);
+
+    private int requestId;
+    private long timeoutMillis;
+    private AsyncHandler asyncHandler;
+
+    private volatile RemotingCommand responseCommand;
+    private volatile boolean sendRequestOK = true;
+    private volatile Throwable cause;
+    private SemaphoreReleaseOnlyOnce once;
+
+    private RemotingCommand requestCommand;
+    private InterceptorGroup interceptorGroup;
+    private String remoteAddr;
+
+    public ResponseResult(int requestId, long timeoutMillis, AsyncHandler 
asyncHandler, SemaphoreReleaseOnlyOnce once) {
+        this.requestId = requestId;
+        this.timeoutMillis = timeoutMillis;
+        this.asyncHandler = asyncHandler;
+        this.once = once;
+    }
+
+    public ResponseResult(int requestId, long timeoutMillis) {
+        this.requestId = requestId;
+        this.timeoutMillis = timeoutMillis;
+    }
+
+    public void executeRequestSendFailed() {
+        if (this.interceptorExecuted.compareAndSet(false, true)) {
+            try {
+                interceptorGroup.onException(new 
ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
+                    cause, "REQUEST_SEND_FAILED"));
+            } catch (Throwable e) {
+            }
+            //Sync call
+            if (null != asyncHandler) {
+                asyncHandler.onFailure(requestCommand);
+            }
+        }
+    }
+
+    public void executeCallbackArrived(final RemotingCommand response) {
+        if (this.interceptorExecuted.compareAndSet(false, true)) {
+            try {
+                interceptorGroup.afterResponseReceived(new 
ResponseContext(RemotingEndPoint.REQUEST, this.remoteAddr,
+                    this.requestCommand, response));
+            } catch (Throwable e) {
+            }
+            if (null != asyncHandler) {
+                asyncHandler.onSuccess(response);
+            }
+        }
+    }
+
+    public void onTimeout(long costTimeMillis, long timoutMillis) {
+        if (this.interceptorExecuted.compareAndSet(false, true)) {
+            try {
+                interceptorGroup.onException(new 
ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
+                    null, "CALLBACK_TIMEOUT"));
+            } catch (Throwable e) {
+            }
+            if (null != asyncHandler) {
+                asyncHandler.onTimeout(costTimeMillis, timoutMillis);
+            }
+        }
+    }
+
+    public void release() {
+        if (this.once != null) {
+            this.once.release();
+        }
+    }
+
+    public RemotingCommand waitResponse(final long timeoutMillis) {
+        try {
+            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return this.responseCommand;
+    }
+
+    public void putResponse(final RemotingCommand responseCommand) {
+        this.responseCommand = responseCommand;
+        this.countDownLatch.countDown();
+    }
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+    public boolean isSendRequestOK() {
+        return sendRequestOK;
+    }
+
+    public void setSendRequestOK(boolean sendRequestOK) {
+        this.sendRequestOK = sendRequestOK;
+    }
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+    public AsyncHandler getAsyncHandler() {
+        return asyncHandler;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+    public void setCause(Throwable cause) {
+        this.cause = cause;
+    }
+
+    public RemotingCommand getResponseCommand() {
+        return responseCommand;
+    }
+
+    public void setResponseCommand(RemotingCommand responseCommand) {
+        this.responseCommand = responseCommand;
+    }
+
+    public int getRequestId() {
+        return requestId;
+    }
+
+    public RemotingCommand getRequestCommand() {
+        return requestCommand;
+    }
+
+    public void setRequestCommand(RemotingCommand requestCommand) {
+        this.requestCommand = requestCommand;
+    }
+
+    public InterceptorGroup getInterceptorGroup() {
+        return interceptorGroup;
+    }
+
+    public void setInterceptorGroup(InterceptorGroup interceptorGroup) {
+        this.interceptorGroup = interceptorGroup;
+    }
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+    public void setRemoteAddr(String remoteAddr) {
+        this.remoteAddr = remoteAddr;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.MULTI_LINE_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
new file mode 100644
index 0000000..1c5849b
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SemaphoreReleaseOnlyOnce {
+    private final AtomicBoolean released = new AtomicBoolean(false);
+    private final Semaphore semaphore;
+
+    public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
+        this.semaphore = semaphore;
+    }
+
+    public void release() {
+        if (this.released.compareAndSet(false, true)) {
+            this.semaphore.release();
+        }
+    }
+
+    public Semaphore getSemaphore() {
+        return semaphore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java
new file mode 100755
index 0000000..db959b7
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.common.metrics;
+
+import io.netty.channel.group.ChannelGroup;
+
+public interface ChannelMetrics {
+
+    Integer getChannelCount();
+
+    ChannelGroup getChannels();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
new file mode 100644
index 0000000..b330041
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer;
+
+public class RemotingConfig extends TcpSocketConfig {
+    private int connectionMaxRetries = 3;
+    private int connectionChannelReaderIdleSeconds = 0;
+    private int connectionChannelWriterIdleSeconds = 0;
+    /**
+     * IdleStateEvent will be triggered when neither read nor write was
+     * performed for the specified period of this time. Specify {@code 0} to
+     * disable
+     */
+    private int connectionChannelIdleSeconds = 120;
+    private int writeBufLowWaterMark = 32 * 10240;
+    private int writeBufHighWaterMark = 64 * 10240;
+    private int threadTaskLowWaterMark = 30000;
+    private int threadTaskHighWaterMark = 50000;
+    private int connectionRetryBackoffMillis = 3000;
+    private String protocolName = Protocol.MVP;
+    private String serializerName = MsgPackSerializer.SERIALIZER_NAME;
+    private String compressorName = GZipCompressor.COMPRESSOR_NAME;
+    private int serviceThreadBlockQueueSize = 50000;
+    private boolean clientNativeEpollEnable = false;
+    private int clientWorkerThreads = 16 + 
Runtime.getRuntime().availableProcessors() * 2;
+    private int clientConnectionFutureAwaitTimeoutMillis = 30000;
+    private int clientAsyncCallbackExecutorThreads = 16 + 
Runtime.getRuntime().availableProcessors() * 2;
+    private int clientOnewayInvokeSemaphore = 20480;
+
+    //=============Server configuration==================
+    private int clientAsyncInvokeSemaphore = 20480;
+    private boolean clientPooledBytebufAllocatorEnable = false;
+    private boolean clientCloseSocketIfTimeout = true;
+    private boolean clientShortConnectionEnable = false;
+    private long clientPublishServiceTimeout = 10000;
+    private long clientConsumerServiceTimeout = 10000;
+    private long clientInvokeServiceTimeout = 10000;
+    private int clientMaxRetryCount = 10;
+    private int clientSleepBeforeRetry = 100;
+    private int serverListenPort = 8888;
+    /**
+     * If server only listened 1 port,recommend to set the value to 1
+     */
+    private int serverAcceptorThreads = 1;
+    private int serverIoThreads = 16 + 
Runtime.getRuntime().availableProcessors() * 2;
+    private int serverWorkerThreads = 16 + 
Runtime.getRuntime().availableProcessors() * 2;
+    private int serverOnewayInvokeSemaphore = 256;
+    private int serverAsyncInvokeSemaphore = 6400;
+    private boolean serverNativeEpollEnable = false;
+    private int serverAsyncCallbackExecutorThreads = 
Runtime.getRuntime().availableProcessors() * 2;
+    private boolean serverPooledBytebufAllocatorEnable = true;
+    private boolean serverAuthOpenEnable = true;
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.MULTI_LINE_STYLE);
+    }
+
+    public int getConnectionMaxRetries() {
+        return connectionMaxRetries;
+    }
+
+    public void setConnectionMaxRetries(final int connectionMaxRetries) {
+        this.connectionMaxRetries = connectionMaxRetries;
+    }
+
+    public int getConnectionChannelReaderIdleSeconds() {
+        return connectionChannelReaderIdleSeconds;
+    }
+
+    public void setConnectionChannelReaderIdleSeconds(final int 
connectionChannelReaderIdleSeconds) {
+        this.connectionChannelReaderIdleSeconds = 
connectionChannelReaderIdleSeconds;
+    }
+
+    public int getConnectionChannelWriterIdleSeconds() {
+        return connectionChannelWriterIdleSeconds;
+    }
+
+    public void setConnectionChannelWriterIdleSeconds(final int 
connectionChannelWriterIdleSeconds) {
+        this.connectionChannelWriterIdleSeconds = 
connectionChannelWriterIdleSeconds;
+    }
+
+    public int getConnectionChannelIdleSeconds() {
+        return connectionChannelIdleSeconds;
+    }
+
+    public void setConnectionChannelIdleSeconds(final int 
connectionChannelIdleSeconds) {
+        this.connectionChannelIdleSeconds = connectionChannelIdleSeconds;
+    }
+
+    public int getWriteBufLowWaterMark() {
+        return writeBufLowWaterMark;
+    }
+
+    public void setWriteBufLowWaterMark(final int writeBufLowWaterMark) {
+        this.writeBufLowWaterMark = writeBufLowWaterMark;
+    }
+
+    public int getWriteBufHighWaterMark() {
+        return writeBufHighWaterMark;
+    }
+
+    public void setWriteBufHighWaterMark(final int writeBufHighWaterMark) {
+        this.writeBufHighWaterMark = writeBufHighWaterMark;
+    }
+
+    public int getThreadTaskLowWaterMark() {
+        return threadTaskLowWaterMark;
+    }
+
+    public void setThreadTaskLowWaterMark(final int threadTaskLowWaterMark) {
+        this.threadTaskLowWaterMark = threadTaskLowWaterMark;
+    }
+
+    public int getThreadTaskHighWaterMark() {
+        return threadTaskHighWaterMark;
+    }
+
+    public void setThreadTaskHighWaterMark(final int threadTaskHighWaterMark) {
+        this.threadTaskHighWaterMark = threadTaskHighWaterMark;
+    }
+
+    public int getConnectionRetryBackoffMillis() {
+        return connectionRetryBackoffMillis;
+    }
+
+    public void setConnectionRetryBackoffMillis(final int 
connectionRetryBackoffMillis) {
+        this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
+    }
+
+    public String getProtocolName() {
+        return protocolName;
+    }
+
+    public void setProtocolName(final String protocolName) {
+        this.protocolName = protocolName;
+    }
+
+    public String getSerializerName() {
+        return serializerName;
+    }
+
+    public void setSerializerName(final String serializerName) {
+        this.serializerName = serializerName;
+    }
+
+    public String getCompressorName() {
+        return compressorName;
+    }
+
+    public void setCompressorName(final String compressorName) {
+        this.compressorName = compressorName;
+    }
+
+    public int getServiceThreadBlockQueueSize() {
+        return serviceThreadBlockQueueSize;
+    }
+
+    public void setServiceThreadBlockQueueSize(final int 
serviceThreadBlockQueueSize) {
+        this.serviceThreadBlockQueueSize = serviceThreadBlockQueueSize;
+    }
+
+    public boolean isClientNativeEpollEnable() {
+        return clientNativeEpollEnable;
+    }
+
+    public void setClientNativeEpollEnable(final boolean 
clientNativeEpollEnable) {
+        this.clientNativeEpollEnable = clientNativeEpollEnable;
+    }
+
+    public int getClientWorkerThreads() {
+        return clientWorkerThreads;
+    }
+
+    public void setClientWorkerThreads(final int clientWorkerThreads) {
+        this.clientWorkerThreads = clientWorkerThreads;
+    }
+
+    public int getClientConnectionFutureAwaitTimeoutMillis() {
+        return clientConnectionFutureAwaitTimeoutMillis;
+    }
+
+    public void setClientConnectionFutureAwaitTimeoutMillis(final int 
clientConnectionFutureAwaitTimeoutMillis) {
+        this.clientConnectionFutureAwaitTimeoutMillis = 
clientConnectionFutureAwaitTimeoutMillis;
+    }
+
+    public int getClientAsyncCallbackExecutorThreads() {
+        return clientAsyncCallbackExecutorThreads;
+    }
+
+    public void setClientAsyncCallbackExecutorThreads(final int 
clientAsyncCallbackExecutorThreads) {
+        this.clientAsyncCallbackExecutorThreads = 
clientAsyncCallbackExecutorThreads;
+    }
+
+    public int getClientOnewayInvokeSemaphore() {
+        return clientOnewayInvokeSemaphore;
+    }
+
+    public void setClientOnewayInvokeSemaphore(final int 
clientOnewayInvokeSemaphore) {
+        this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
+    }
+
+    public int getClientAsyncInvokeSemaphore() {
+        return clientAsyncInvokeSemaphore;
+    }
+
+    public void setClientAsyncInvokeSemaphore(final int 
clientAsyncInvokeSemaphore) {
+        this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
+    }
+
+    public boolean isClientPooledBytebufAllocatorEnable() {
+        return clientPooledBytebufAllocatorEnable;
+    }
+
+    public void setClientPooledBytebufAllocatorEnable(final boolean 
clientPooledBytebufAllocatorEnable) {
+        this.clientPooledBytebufAllocatorEnable = 
clientPooledBytebufAllocatorEnable;
+    }
+
+    public boolean isClientCloseSocketIfTimeout() {
+        return clientCloseSocketIfTimeout;
+    }
+
+    public void setClientCloseSocketIfTimeout(final boolean 
clientCloseSocketIfTimeout) {
+        this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
+    }
+
+    public boolean isClientShortConnectionEnable() {
+        return clientShortConnectionEnable;
+    }
+
+    public void setClientShortConnectionEnable(final boolean 
clientShortConnectionEnable) {
+        this.clientShortConnectionEnable = clientShortConnectionEnable;
+    }
+
+    public long getClientPublishServiceTimeout() {
+        return clientPublishServiceTimeout;
+    }
+
+    public void setClientPublishServiceTimeout(final long 
clientPublishServiceTimeout) {
+        this.clientPublishServiceTimeout = clientPublishServiceTimeout;
+    }
+
+    public long getClientConsumerServiceTimeout() {
+        return clientConsumerServiceTimeout;
+    }
+
+    public void setClientConsumerServiceTimeout(final long 
clientConsumerServiceTimeout) {
+        this.clientConsumerServiceTimeout = clientConsumerServiceTimeout;
+    }
+
+    public long getClientInvokeServiceTimeout() {
+        return clientInvokeServiceTimeout;
+    }
+
+    public void setClientInvokeServiceTimeout(final long 
clientInvokeServiceTimeout) {
+        this.clientInvokeServiceTimeout = clientInvokeServiceTimeout;
+    }
+
+    public int getClientMaxRetryCount() {
+        return clientMaxRetryCount;
+    }
+
+    public void setClientMaxRetryCount(final int clientMaxRetryCount) {
+        this.clientMaxRetryCount = clientMaxRetryCount;
+    }
+
+    public int getClientSleepBeforeRetry() {
+        return clientSleepBeforeRetry;
+    }
+
+    public void setClientSleepBeforeRetry(final int clientSleepBeforeRetry) {
+        this.clientSleepBeforeRetry = clientSleepBeforeRetry;
+    }
+
+    public int getServerListenPort() {
+        return serverListenPort;
+    }
+
+    public void setServerListenPort(final int serverListenPort) {
+        this.serverListenPort = serverListenPort;
+    }
+
+    public int getServerAcceptorThreads() {
+        return serverAcceptorThreads;
+    }
+
+    public void setServerAcceptorThreads(final int serverAcceptorThreads) {
+        this.serverAcceptorThreads = serverAcceptorThreads;
+    }
+
+    public int getServerIoThreads() {
+        return serverIoThreads;
+    }
+
+    public void setServerIoThreads(final int serverIoThreads) {
+        this.serverIoThreads = serverIoThreads;
+    }
+
+    public int getServerWorkerThreads() {
+        return serverWorkerThreads;
+    }
+
+    public void setServerWorkerThreads(final int serverWorkerThreads) {
+        this.serverWorkerThreads = serverWorkerThreads;
+    }
+
+    public int getServerOnewayInvokeSemaphore() {
+        return serverOnewayInvokeSemaphore;
+    }
+
+    public void setServerOnewayInvokeSemaphore(final int 
serverOnewayInvokeSemaphore) {
+        this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
+    }
+
+    public int getServerAsyncInvokeSemaphore() {
+        return serverAsyncInvokeSemaphore;
+    }
+
+    public void setServerAsyncInvokeSemaphore(final int 
serverAsyncInvokeSemaphore) {
+        this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
+    }
+
+    public boolean isServerNativeEpollEnable() {
+        return serverNativeEpollEnable;
+    }
+
+    public void setServerNativeEpollEnable(final boolean 
serverNativeEpollEnable) {
+        this.serverNativeEpollEnable = serverNativeEpollEnable;
+    }
+
+    public int getServerAsyncCallbackExecutorThreads() {
+        return serverAsyncCallbackExecutorThreads;
+    }
+
+    public void setServerAsyncCallbackExecutorThreads(final int 
serverAsyncCallbackExecutorThreads) {
+        this.serverAsyncCallbackExecutorThreads = 
serverAsyncCallbackExecutorThreads;
+    }
+
+    public boolean isServerPooledBytebufAllocatorEnable() {
+        return serverPooledBytebufAllocatorEnable;
+    }
+
+    public void setServerPooledBytebufAllocatorEnable(final boolean 
serverPooledBytebufAllocatorEnable) {
+        this.serverPooledBytebufAllocatorEnable = 
serverPooledBytebufAllocatorEnable;
+    }
+
+    public boolean isServerAuthOpenEnable() {
+        return serverAuthOpenEnable;
+    }
+
+    public void setServerAuthOpenEnable(final boolean serverAuthOpenEnable) {
+        this.serverAuthOpenEnable = serverAuthOpenEnable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
new file mode 100755
index 0000000..4dfcde7
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+/**
+ * TCP socket configuration
+ *
+ * @see java.net.SocketOptions
+ */
+public class TcpSocketConfig {
+    private boolean tcpSoReuseAddress;
+    private boolean tcpSoKeepAlive;
+    private boolean tcpSoNoDelay;
+    private int tcpSoSndBufSize;  // see /proc/sys/net/ipv4/tcp_rmem
+    private int tcpSoRcvBufSize;  // see /proc/sys/net/ipv4/tcp_wmem
+    private int tcpSoBacklogSize;
+    private int tcpSoLinger;
+    private int tcpSoTimeout;
+
+    public boolean isTcpSoReuseAddress() {
+        return tcpSoReuseAddress;
+    }
+
+    public void setTcpSoReuseAddress(final boolean tcpSoReuseAddress) {
+        this.tcpSoReuseAddress = tcpSoReuseAddress;
+    }
+
+    public boolean isTcpSoKeepAlive() {
+        return tcpSoKeepAlive;
+    }
+
+    public void setTcpSoKeepAlive(final boolean tcpSoKeepAlive) {
+        this.tcpSoKeepAlive = tcpSoKeepAlive;
+    }
+
+    public boolean isTcpSoNoDelay() {
+        return tcpSoNoDelay;
+    }
+
+    public void setTcpSoNoDelay(final boolean tcpSoNoDelay) {
+        this.tcpSoNoDelay = tcpSoNoDelay;
+    }
+
+    public int getTcpSoSndBufSize() {
+        return tcpSoSndBufSize;
+    }
+
+    public void setTcpSoSndBufSize(final int tcpSoSndBufSize) {
+        this.tcpSoSndBufSize = tcpSoSndBufSize;
+    }
+
+    public int getTcpSoRcvBufSize() {
+        return tcpSoRcvBufSize;
+    }
+
+    public void setTcpSoRcvBufSize(final int tcpSoRcvBufSize) {
+        this.tcpSoRcvBufSize = tcpSoRcvBufSize;
+    }
+
+    public int getTcpSoBacklogSize() {
+        return tcpSoBacklogSize;
+    }
+
+    public void setTcpSoBacklogSize(final int tcpSoBacklogSize) {
+        this.tcpSoBacklogSize = tcpSoBacklogSize;
+    }
+
+    public int getTcpSoLinger() {
+        return tcpSoLinger;
+    }
+
+    public void setTcpSoLinger(final int tcpSoLinger) {
+        this.tcpSoLinger = tcpSoLinger;
+    }
+
+    public int getTcpSoTimeout() {
+        return tcpSoTimeout;
+    }
+
+    public void setTcpSoTimeout(final int tcpSoTimeout) {
+        this.tcpSoTimeout = tcpSoTimeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
new file mode 100644
index 0000000..1a80d20
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.external;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ThreadUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadUtils.class);
+
+    /**
+     * A constructor to stop this class being constructed.
+     */
+    private ThreadUtils() {
+        // Unused
+
+    }
+
+    public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+        int maximumPoolSize,
+        long keepAliveTime,
+        TimeUnit unit,
+        BlockingQueue<Runnable> workQueue, String processName, boolean 
isDaemon) {
+        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+    }
+
+    public static ExecutorService newSingleThreadExecutor(String processName, 
boolean isDaemon) {
+        return Executors.newSingleThreadExecutor(newThreadFactory(processName, 
isDaemon));
+    }
+
+    public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
+        return 
Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, 
isDaemon));
+    }
+
+    public static ScheduledExecutorService newFixedThreadScheduledPool(int 
nThreads, String processName,
+        boolean isDaemon) {
+        return Executors.newScheduledThreadPool(nThreads, 
newThreadFactory(processName, isDaemon));
+    }
+
+    public static ThreadFactory newThreadFactory(String processName, boolean 
isDaemon) {
+        return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(String processName) {
+        return newGenericThreadFactory(processName, false);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(String processName, 
int threads) {
+        return newGenericThreadFactory(processName, threads, false);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(final String 
processName, final boolean isDaemon) {
+        return new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s_%d", 
processName, this.threadIndex.incrementAndGet()));
+                thread.setDaemon(isDaemon);
+                return thread;
+            }
+        };
+    }
+
+    public static ThreadFactory newGenericThreadFactory(final String 
processName, final int threads,
+        final boolean isDaemon) {
+        return new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s_%d_%d", 
processName, threads, this.threadIndex.incrementAndGet()));
+                thread.setDaemon(isDaemon);
+                return thread;
+            }
+        };
+    }
+
+    /**
+     * Create a new thread
+     *
+     * @param name The name of the thread
+     * @param runnable The work for the thread to do
+     * @param daemon Should the thread block JVM stop?
+     * @return The unstarted thread
+     */
+    public static Thread newThread(String name, Runnable runnable, boolean 
daemon) {
+        Thread thread = new Thread(runnable, name);
+        thread.setDaemon(daemon);
+        thread.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                LOG.error("Uncaught exception in thread '" + t.getName() + 
"':", e);
+            }
+        });
+        return thread;
+    }
+
+    /**
+     * Shutdown passed thread using isAlive and join.
+     *
+     * @param t Thread to stop
+     */
+    public static void shutdownGracefully(final Thread t) {
+        shutdownGracefully(t, 0);
+    }
+
+    /**
+     * Shutdown passed thread using isAlive and join.
+     *
+     * @param millis Pass 0 if we're to wait forever.
+     * @param t Thread to stop
+     */
+    public static void shutdownGracefully(final Thread t, final long millis) {
+        if (t == null)
+            return;
+        while (t.isAlive()) {
+            try {
+                t.interrupt();
+                t.join(millis);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * An implementation of the graceful stop sequence recommended by
+     * {@link ExecutorService}.
+     *
+     * @param executor executor
+     * @param timeout timeout
+     * @param timeUnit timeUnit
+     */
+    public static void shutdownGracefully(ExecutorService executor, long 
timeout, TimeUnit timeUnit) {
+        // Disable new tasks from being submitted.
+        executor.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate.
+            if (!executor
+                .awaitTermination(timeout, timeUnit)) {
+                executor.shutdownNow();
+                // Wait a while for tasks to respond to being cancelled.
+                if (!executor.awaitTermination(timeout, timeUnit)) {
+                    LOG.warn(String.format("%s didn't terminate!", executor));
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted.
+            executor.shutdownNow();
+            // Preserve interrupt status.
+            Thread.currentThread().interrupt();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
new file mode 100644
index 0000000..e17bcfd
--- /dev/null
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.buffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
+
+public class NettyByteBufferWrapper implements ByteBufferWrapper {
+    private final ByteBuf buffer;
+    private final Channel channel;
+
+    public NettyByteBufferWrapper(ByteBuf buffer) {
+        this(buffer, null);
+    }
+
+    public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) {
+        this.channel = channel;
+        this.buffer = buffer;
+    }
+
+    public void writeByte(int index, byte data) {
+        buffer.writeByte(data);
+    }
+
+    public void writeByte(byte data) {
+        buffer.writeByte(data);
+    }
+
+    public byte readByte() {
+        return buffer.readByte();
+    }
+
+    public void writeInt(int data) {
+        buffer.writeInt(data);
+    }
+
+    public void writeBytes(byte[] data) {
+        buffer.writeBytes(data);
+    }
+
+    @Override
+    public void writeBytes(final ByteBuffer data) {
+        buffer.writeBytes(data);
+    }
+
+    public int readableBytes() {
+        return buffer.readableBytes();
+    }
+
+    public int readInt() {
+        return buffer.readInt();
+    }
+
+    public void readBytes(byte[] dst) {
+        buffer.readBytes(dst);
+    }
+
+    @Override
+    public void readBytes(final ByteBuffer dst) {
+        buffer.readBytes(dst);
+    }
+
+    public int readerIndex() {
+        return buffer.readerIndex();
+    }
+
+    public void setReaderIndex(int index) {
+        buffer.setIndex(index, buffer.writerIndex());
+    }
+
+    @Override
+    public void writeLong(long value) {
+        buffer.writeLong(value);
+    }
+
+    @Override
+    public long readLong() {
+        return buffer.readLong();
+    }
+
+    @Override
+    public void ensureCapacity(int capacity) {
+        buffer.capacity(capacity);
+    }
+
+    @Override
+    public short readShort() {
+        return buffer.readShort();
+    }
+
+    @Override
+    public void writeShort(final short value) {
+        buffer.writeShort(value);
+    }
+}
+
+


Reply via email to