Repository: incubator-rocketmq
Updated Branches:
  refs/heads/rocketmq5 0b88e66fa -> 6593294f0


Minor polish


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/114b6ae0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/114b6ae0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/114b6ae0

Branch: refs/heads/rocketmq5
Commit: 114b6ae083ed5338e2b59a501ec08ec23c3e2ece
Parents: 0b88e66
Author: yukon <yu...@apache.org>
Authored: Wed Sep 20 17:06:04 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Sep 20 17:06:04 2017 +0800

----------------------------------------------------------------------
 .../remoting/api/buffer/ByteBufferWrapper.java  | 28 ++++----
 .../impl/buffer/NettyByteBufferWrapper.java     | 73 ++++++++++----------
 .../impl/netty/NettyRemotingAbstract.java       | 16 +++--
 .../remoting/impl/netty/handler/Decoder.java    |  2 +-
 4 files changed, 64 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
index 7cae3ac..7360c88 100644
--- 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/buffer/ByteBufferWrapper.java
@@ -20,37 +20,37 @@ package org.apache.rocketmq.remoting.api.buffer;
 import java.nio.ByteBuffer;
 
 public interface ByteBufferWrapper {
-    void writeByte(int index, byte data);
-
     void writeByte(byte data);
 
-    byte readByte();
-
-    void writeInt(int data);
+    void writeByte(int index, byte data);
 
     void writeBytes(byte[] data);
 
     void writeBytes(ByteBuffer data);
 
-    int readableBytes();
+    void writeInt(int data);
 
-    int readInt();
+    void writeShort(short value);
+
+    void writeLong(long id);
+
+    byte readByte();
 
     void readBytes(byte[] dst);
 
     void readBytes(ByteBuffer dst);
 
-    int readerIndex();
-
-    void setReaderIndex(int readerIndex);
+    short readShort();
 
-    void writeLong(long id);
+    int readInt();
 
     long readLong();
 
-    void ensureCapacity(int capacity);
+    int readableBytes();
 
-    short readShort();
+    int readerIndex();
 
-    void writeShort(short value);
+    void setReaderIndex(int readerIndex);
+
+    void ensureCapacity(int capacity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
index e17bcfd..5a71452 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
@@ -18,39 +18,27 @@
 package org.apache.rocketmq.remoting.impl.buffer;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import java.nio.ByteBuffer;
 import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
 
 public class NettyByteBufferWrapper implements ByteBufferWrapper {
     private final ByteBuf buffer;
-    private final Channel channel;
 
     public NettyByteBufferWrapper(ByteBuf buffer) {
-        this(buffer, null);
-    }
-
-    public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) {
-        this.channel = channel;
         this.buffer = buffer;
     }
 
-    public void writeByte(int index, byte data) {
-        buffer.writeByte(data);
-    }
-
+    @Override
     public void writeByte(byte data) {
         buffer.writeByte(data);
     }
 
-    public byte readByte() {
-        return buffer.readByte();
-    }
-
-    public void writeInt(int data) {
-        buffer.writeInt(data);
+    @Override
+    public void writeByte(int index, byte data) {
+        buffer.writeByte(data);
     }
 
+    @Override
     public void writeBytes(byte[] data) {
         buffer.writeBytes(data);
     }
@@ -60,16 +48,24 @@ public class NettyByteBufferWrapper implements 
ByteBufferWrapper {
         buffer.writeBytes(data);
     }
 
-    public int readableBytes() {
-        return buffer.readableBytes();
+    @Override
+    public void writeShort(final short value) {
+        buffer.writeShort(value);
     }
 
-    public int readInt() {
-        return buffer.readInt();
+    @Override
+    public void writeInt(int data) {
+        buffer.writeInt(data);
     }
 
-    public void readBytes(byte[] dst) {
-        buffer.readBytes(dst);
+    @Override
+    public void writeLong(long value) {
+        buffer.writeLong(value);
+    }
+
+    @Override
+    public byte readByte() {
+        return buffer.readByte();
     }
 
     @Override
@@ -77,17 +73,19 @@ public class NettyByteBufferWrapper implements 
ByteBufferWrapper {
         buffer.readBytes(dst);
     }
 
-    public int readerIndex() {
-        return buffer.readerIndex();
+    @Override
+    public void readBytes(byte[] dst) {
+        buffer.readBytes(dst);
     }
 
-    public void setReaderIndex(int index) {
-        buffer.setIndex(index, buffer.writerIndex());
+    @Override
+    public short readShort() {
+        return buffer.readShort();
     }
 
     @Override
-    public void writeLong(long value) {
-        buffer.writeLong(value);
+    public int readInt() {
+        return buffer.readInt();
     }
 
     @Override
@@ -96,18 +94,23 @@ public class NettyByteBufferWrapper implements 
ByteBufferWrapper {
     }
 
     @Override
-    public void ensureCapacity(int capacity) {
-        buffer.capacity(capacity);
+    public int readableBytes() {
+        return buffer.readableBytes();
     }
 
     @Override
-    public short readShort() {
-        return buffer.readShort();
+    public int readerIndex() {
+        return buffer.readerIndex();
     }
 
     @Override
-    public void writeShort(final short value) {
-        buffer.writeShort(value);
+    public void setReaderIndex(int index) {
+        buffer.setIndex(index, buffer.writerIndex());
+    }
+
+    @Override
+    public void ensureCapacity(int capacity) {
+        buffer.capacity(capacity);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 1af62cb..4c22e7c 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -77,7 +77,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
     private final Semaphore semaphoreAsync;
     private final Map<Integer, ResponseResult> ackTables = new 
ConcurrentHashMap<Integer, ResponseResult>(256);
     private final Map<String, Pair<RequestProcessor, ExecutorService>> 
processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, 
ExecutorService>>();
-    private final AtomicLong count = new AtomicLong(0);
+    private final AtomicLong responseCounter = new AtomicLong(0);
     private final RemotingCommandFactory remotingCommandFactory;
     private final String remotingInstanceId = 
UIDGenerator.instance().createUID();
 
@@ -93,8 +93,13 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
     NettyRemotingAbstract(RemotingConfig clientConfig, 
RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
         this.semaphoreOneway = new 
Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
         this.semaphoreAsync = new 
Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
-        this.publicExecutor = 
ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(),
 clientConfig.getClientAsyncCallbackExecutorThreads(), 60,
-            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000), 
"PublicExecutor", true);
+        this.publicExecutor = ThreadUtils.newThreadPoolExecutor(
+            clientConfig.getClientAsyncCallbackExecutorThreads(),
+            clientConfig.getClientAsyncCallbackExecutorThreads(),
+            60,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<Runnable>(10000),
+            "PublicExecutor", true);
         this.remotingCommandFactory = new 
RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
     }
 
@@ -237,9 +242,10 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
 
             long time = System.currentTimeMillis();
             ackTables.remove(cmd.requestID());
-            if (count.incrementAndGet() % 5000 == 0)
-                LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", 
cmd.requestID(), time - responseResult.getBeginTimestamp(),
+            if (responseCounter.incrementAndGet() % 5000 == 0) {
+                LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", 
cmd.requestID(), time - responseResult.getBeginTimestamp(),
                     ackTables.size());
+            }
             if (responseResult.getAsyncHandler() != null) {
                 boolean sameThread = false;
                 ExecutorService executor = this.getCallbackExecutor();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/114b6ae0/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
----------------------------------------------------------------------
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
index 87a0912..ec1d69d 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
@@ -44,7 +44,7 @@ public class Decoder extends ByteToMessageDecoder {
             return;
         }
 
-        NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in, 
ctx.channel());
+        NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in);
 
         Object msg = this.decode(ctx, wrapper);
         if (msg != null) {

Reply via email to