[1/3] incubator-rocketmq git commit: Minor polish
Repository: incubator-rocketmq Updated Branches: refs/heads/rocketmq5 0b88e66fa -> 6593294f0 Minor polish Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/114b6ae0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/114b6ae0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/114b6ae0 Branch: refs/heads/rocketmq5 Commit: 114b6ae083ed5338e2b59a501ec08ec23c3e2ece Parents: 0b88e66 Author: yukonAuthored: Wed Sep 20 17:06:04 2017 +0800 Committer: yukon Committed: Wed Sep 20 17:06:04 2017 +0800 -- .../remoting/api/buffer/ByteBufferWrapper.java | 28 .../impl/buffer/NettyByteBufferWrapper.java | 73 ++-- .../impl/netty/NettyRemotingAbstract.java | 16 +++-- .../remoting/impl/netty/handler/Decoder.java| 2 +- 4 files changed, 64 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java -- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java index 7cae3ac..7360c88 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java @@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer; import java.nio.ByteBuffer; public interface ByteBufferWrapper { -void writeByte(int index, byte data); - void writeByte(byte data); -byte readByte(); - -void writeInt(int data); +void writeByte(int index, byte data); void writeBytes(byte[] data); void writeBytes(ByteBuffer data); -int readableBytes(); +void writeInt(int data); -int readInt(); +void writeShort(short value); + +void writeLong(long id); + +byte readByte(); void readBytes(byte[] dst); void readBytes(ByteBuffer dst); -int readerIndex(); - -void setReaderIndex(int readerIndex); +short readShort(); -void writeLong(long id); +int readInt(); long readLong(); -void ensureCapacity(int capacity); +int readableBytes(); -short readShort(); +int readerIndex(); -void writeShort(short value); +void setReaderIndex(int readerIndex); + +void ensureCapacity(int capacity); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java -- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java index e17bcfd..5a71452 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java @@ -18,39 +18,27 @@ package org.apache.rocketmq.remoting.impl.buffer; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import java.nio.ByteBuffer; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; public class NettyByteBufferWrapper implements ByteBufferWrapper { private final ByteBuf buffer; -private final Channel channel; public NettyByteBufferWrapper(ByteBuf buffer) { -this(buffer, null); -} - -public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) { -this.channel = channel; this.buffer = buffer; } -public void writeByte(int index, byte data) { -buffer.writeByte(data); -} - +@Override public void writeByte(byte data) { buffer.writeByte(data); } -public byte readByte() { -return buffer.readByte(); -} - -public void writeInt(int data) { -buffer.writeInt(data); +@Override +public void writeByte(int index, byte data) { +buffer.writeByte(data); } +@Override public void writeBytes(byte[] data) { buffer.writeBytes(data); } @@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { buffer.writeBytes(data); } -public int readableBytes() { -return
[3/3] incubator-rocketmq git commit: Polish thread poll create method
Polish thread poll create method Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6593294f Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6593294f Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6593294f Branch: refs/heads/rocketmq5 Commit: 6593294f07dec7039668e7f5807f530433c3bbda Parents: 489b1d8 Author: yukonAuthored: Wed Sep 20 17:39:46 2017 +0800 Committer: yukon Committed: Wed Sep 20 20:37:15 2017 +0800 -- remoting-core/pom.xml | 2 +- .../rocketmq/remoting/external/ThreadUtils.java | 26 +--- .../impl/netty/NettyRemotingAbstract.java | 8 ++ .../rpc/impl/client/SimpleClientImpl.java | 10 +++- .../rpc/impl/server/SimpleServerImpl.java | 10 +++- .../rpc/impl/service/RpcInstanceAbstract.java | 18 -- .../rpc/impl/service/RpcProxyCommon.java| 19 +- 7 files changed, 37 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/pom.xml -- diff --git a/remoting-core/pom.xml b/remoting-core/pom.xml index 702b826..997011b 100644 --- a/remoting-core/pom.xml +++ b/remoting-core/pom.xml @@ -77,7 +77,7 @@ io.netty netty-all -4.1.6.Final +4.1.15.Final com.alibaba http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java -- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java index 1a80d20..5a50089 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -43,25 +44,32 @@ public final class ThreadUtils { int maximumPoolSize, long keepAliveTime, TimeUnit unit, -BlockingQueue workQueue, String processName, boolean isDaemon) { -return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); +BlockingQueue workQueue, +String processName, boolean isDaemon) { +return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon)); +} + +public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) { +return new ThreadPoolExecutor( +nThreads, +nThreads, +0, +TimeUnit.MILLISECONDS, +new LinkedBlockingQueue(workQueueCapacity), +newGenericThreadFactory(processName, isDaemon)); } public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { -return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); +return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { -return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); +return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon)); } public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, boolean isDaemon) { -return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); -} - -public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { -return newGenericThreadFactory("Remoting-" + processName, isDaemon); +return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName,
[2/3] incubator-rocketmq git commit: Use LinkedBlockingQueue for better performance
Use LinkedBlockingQueue for better performance Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/489b1d8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/489b1d8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/489b1d8b Branch: refs/heads/rocketmq5 Commit: 489b1d8b7829990bd5b6ecef1ab5aeae685a1ec4 Parents: 114b6ae Author: yukonAuthored: Wed Sep 20 17:18:43 2017 +0800 Committer: yukon Committed: Wed Sep 20 17:18:43 2017 +0800 -- .../impl/netty/NettyRemotingAbstract.java | 3 +-- .../rpc/impl/client/SimpleClientImpl.java | 12 +++ .../rpc/impl/server/SimpleServerImpl.java | 11 ++ .../rpc/impl/service/RpcInstanceAbstract.java | 12 +++ .../rpc/impl/service/RpcProxyCommon.java| 22 +--- 5 files changed, 39 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java -- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 4c22e7c..a5c2118 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { clientConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, -new ArrayBlockingQueue(1), +new LinkedBlockingQueue(1), "PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java -- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java index 787e8c1..4483ca3 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.client; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingService; @@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien super(rpcCommonConfig); this.remotingClient = remotingClient; this.rpcCommonConfig = rpcCommonConfig; -this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), -rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), -TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true); +this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( +rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), +rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), +rpcCommonConfig.getServiceThreadKeepAliveTime(), +TimeUnit.MILLISECONDS, +new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), +"clientCallServiceThread", true); } public void initialize() {