[1/3] incubator-rocketmq git commit: Minor polish

2017-09-20 Thread yukon
Repository: incubator-rocketmq
Updated Branches:
  refs/heads/rocketmq5 0b88e66fa -> 6593294f0


Minor polish


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/114b6ae0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/114b6ae0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/114b6ae0

Branch: refs/heads/rocketmq5
Commit: 114b6ae083ed5338e2b59a501ec08ec23c3e2ece
Parents: 0b88e66
Author: yukon 
Authored: Wed Sep 20 17:06:04 2017 +0800
Committer: yukon 
Committed: Wed Sep 20 17:06:04 2017 +0800

--
 .../remoting/api/buffer/ByteBufferWrapper.java  | 28 
 .../impl/buffer/NettyByteBufferWrapper.java | 73 ++--
 .../impl/netty/NettyRemotingAbstract.java   | 16 +++--
 .../remoting/impl/netty/handler/Decoder.java|  2 +-
 4 files changed, 64 insertions(+), 55 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
--
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
index 7cae3ac..7360c88 100644
--- 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
@@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer;
 import java.nio.ByteBuffer;
 
 public interface ByteBufferWrapper {
-void writeByte(int index, byte data);
-
 void writeByte(byte data);
 
-byte readByte();
-
-void writeInt(int data);
+void writeByte(int index, byte data);
 
 void writeBytes(byte[] data);
 
 void writeBytes(ByteBuffer data);
 
-int readableBytes();
+void writeInt(int data);
 
-int readInt();
+void writeShort(short value);
+
+void writeLong(long id);
+
+byte readByte();
 
 void readBytes(byte[] dst);
 
 void readBytes(ByteBuffer dst);
 
-int readerIndex();
-
-void setReaderIndex(int readerIndex);
+short readShort();
 
-void writeLong(long id);
+int readInt();
 
 long readLong();
 
-void ensureCapacity(int capacity);
+int readableBytes();
 
-short readShort();
+int readerIndex();
 
-void writeShort(short value);
+void setReaderIndex(int readerIndex);
+
+void ensureCapacity(int capacity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
--
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
index e17bcfd..5a71452 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
@@ -18,39 +18,27 @@
 package org.apache.rocketmq.remoting.impl.buffer;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import java.nio.ByteBuffer;
 import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 
 public class NettyByteBufferWrapper implements ByteBufferWrapper {
 private final ByteBuf buffer;
-private final Channel channel;
 
 public NettyByteBufferWrapper(ByteBuf buffer) {
-this(buffer, null);
-}
-
-public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) {
-this.channel = channel;
 this.buffer = buffer;
 }
 
-public void writeByte(int index, byte data) {
-buffer.writeByte(data);
-}
-
+@Override
 public void writeByte(byte data) {
 buffer.writeByte(data);
 }
 
-public byte readByte() {
-return buffer.readByte();
-}
-
-public void writeInt(int data) {
-buffer.writeInt(data);
+@Override
+public void writeByte(int index, byte data) {
+buffer.writeByte(data);
 }
 
+@Override
 public void writeBytes(byte[] data) {
 buffer.writeBytes(data);
 }
@@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements 
ByteBufferWrapper {
 buffer.writeBytes(data);
 }
 
-public int readableBytes() {
-return 

[3/3] incubator-rocketmq git commit: Polish thread poll create method

2017-09-20 Thread yukon
Polish thread poll create method


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6593294f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6593294f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6593294f

Branch: refs/heads/rocketmq5
Commit: 6593294f07dec7039668e7f5807f530433c3bbda
Parents: 489b1d8
Author: yukon 
Authored: Wed Sep 20 17:39:46 2017 +0800
Committer: yukon 
Committed: Wed Sep 20 20:37:15 2017 +0800

--
 remoting-core/pom.xml   |  2 +-
 .../rocketmq/remoting/external/ThreadUtils.java | 26 +---
 .../impl/netty/NettyRemotingAbstract.java   |  8 ++
 .../rpc/impl/client/SimpleClientImpl.java   | 10 +++-
 .../rpc/impl/server/SimpleServerImpl.java   | 10 +++-
 .../rpc/impl/service/RpcInstanceAbstract.java   | 18 --
 .../rpc/impl/service/RpcProxyCommon.java| 19 +-
 7 files changed, 37 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/pom.xml
--
diff --git a/remoting-core/pom.xml b/remoting-core/pom.xml
index 702b826..997011b 100644
--- a/remoting-core/pom.xml
+++ b/remoting-core/pom.xml
@@ -77,7 +77,7 @@
 
 io.netty
 netty-all
-4.1.6.Final
+4.1.15.Final
 
 
 com.alibaba

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
--
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
index 1a80d20..5a50089 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -43,25 +44,32 @@ public final class ThreadUtils {
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
-BlockingQueue workQueue, String processName, boolean 
isDaemon) {
-return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+BlockingQueue workQueue,
+String processName, boolean isDaemon) {
+return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
+}
+
+public static ExecutorService newFixedThreadPool(int nThreads, int 
workQueueCapacity, String processName, boolean isDaemon) {
+return new ThreadPoolExecutor(
+nThreads,
+nThreads,
+0,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingQueue(workQueueCapacity),
+newGenericThreadFactory(processName, isDaemon));
 }
 
 public static ExecutorService newSingleThreadExecutor(String processName, 
boolean isDaemon) {
-return Executors.newSingleThreadExecutor(newThreadFactory(processName, 
isDaemon));
+return 
Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, 
isDaemon));
 }
 
 public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
-return 
Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, 
isDaemon));
+return 
Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, 
isDaemon));
 }
 
 public static ScheduledExecutorService newFixedThreadScheduledPool(int 
nThreads, String processName,
 boolean isDaemon) {
-return Executors.newScheduledThreadPool(nThreads, 
newThreadFactory(processName, isDaemon));
-}
-
-public static ThreadFactory newThreadFactory(String processName, boolean 
isDaemon) {
-return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+return Executors.newScheduledThreadPool(nThreads, 
newGenericThreadFactory(processName, 

[2/3] incubator-rocketmq git commit: Use LinkedBlockingQueue for better performance

2017-09-20 Thread yukon
Use LinkedBlockingQueue for better performance


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/489b1d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/489b1d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/489b1d8b

Branch: refs/heads/rocketmq5
Commit: 489b1d8b7829990bd5b6ecef1ab5aeae685a1ec4
Parents: 114b6ae
Author: yukon 
Authored: Wed Sep 20 17:18:43 2017 +0800
Committer: yukon 
Committed: Wed Sep 20 17:18:43 2017 +0800

--
 .../impl/netty/NettyRemotingAbstract.java   |  3 +--
 .../rpc/impl/client/SimpleClientImpl.java   | 12 +++
 .../rpc/impl/server/SimpleServerImpl.java   | 11 ++
 .../rpc/impl/service/RpcInstanceAbstract.java   | 12 +++
 .../rpc/impl/service/RpcProxyCommon.java| 22 +---
 5 files changed, 39 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
--
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 4c22e7c..a5c2118 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
 clientConfig.getClientAsyncCallbackExecutorThreads(),
 60,
 TimeUnit.SECONDS,
-new ArrayBlockingQueue(1),
+new LinkedBlockingQueue(1),
 "PublicExecutor", true);
 this.remotingCommandFactory = new 
RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
--
diff --git 
a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
 
b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
index 787e8c1..4483ca3 100644
--- 
a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
+++ 
b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
@@ -18,8 +18,8 @@
 package org.apache.rocketmq.rpc.impl.client;
 
 import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract 
implements SimpleClien
 super(rpcCommonConfig);
 this.remotingClient = remotingClient;
 this.rpcCommonConfig = rpcCommonConfig;
-this.callServiceThreadPool = 
ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 
rpcCommonConfig.getServiceThreadKeepAliveTime(),
-TimeUnit.MILLISECONDS, new 
ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), 
"clientCallServiceThread", true);
+this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+rpcCommonConfig.getServiceThreadKeepAliveTime(),
+TimeUnit.MILLISECONDS,
+new 
LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+"clientCallServiceThread", true);
 }
 
 public void initialize() {