Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master b85645996 -> e5892e164 (forced update)


[ROCKETMQ-13] Wrong log level for AcceptSocketService termination.

Additionally, added code comments and did a cleanup.

JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-13


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/fed09763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/fed09763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/fed09763

Branch: refs/heads/master
Commit: fed09763bccb73696d1953937b8b1eb1ce88b131
Parents: 626990c
Author: shtykh_roman <rsht...@yahoo.com>
Authored: Mon Dec 26 12:17:53 2016 +0900
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Mon Dec 26 15:12:41 2016 +0800

----------------------------------------------------------------------
 .../alibaba/rocketmq/store/ha/HAService.java    | 26 +++++++++++++-------
 1 file changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/fed09763/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
index 075252c..2cf695c 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -46,7 +46,7 @@ public class HAService {
 
     private final AtomicInteger connectionCount = new AtomicInteger(0);
 
-    private final List<HAConnection> connectionList = new 
LinkedList<HAConnection>();
+    private final List<HAConnection> connectionList = new LinkedList<>();
 
     private final AcceptSocketService acceptSocketService;
 
@@ -170,17 +170,22 @@ public class HAService {
         return push2SlaveMaxOffset;
     }
 
+    /**
+     * Listens to slave connections to create {@link HAConnection}.
+     */
     class AcceptSocketService extends ServiceThread {
         private ServerSocketChannel serverSocketChannel;
         private Selector selector;
         private final SocketAddress socketAddressListen;
 
-
         public AcceptSocketService(final int port) {
             this.socketAddressListen = new InetSocketAddress(port);
         }
 
-
+        /**
+         * Starts listening to slave connections.
+         * @throws Exception If fails.
+         */
         public void beginAccept() throws Exception {
             this.serverSocketChannel = ServerSocketChannel.open();
             this.selector = RemotingUtil.openSelector();
@@ -190,6 +195,7 @@ public class HAService {
             this.serverSocketChannel.register(this.selector, 
SelectionKey.OP_ACCEPT);
         }
 
+        /** {@inheritDoc} */
         @Override
         public void shutdown(final boolean interrupt) {
             super.shutdown(interrupt);
@@ -202,6 +208,7 @@ public class HAService {
             }
         }
 
+        /** {@inheritDoc} */
         @Override
         public void run() {
             log.info(this.getServiceName() + " service started");
@@ -210,10 +217,12 @@ public class HAService {
                 try {
                     this.selector.select(1000);
                     Set<SelectionKey> selected = this.selector.selectedKeys();
+
                     if (selected != null) {
                         for (SelectionKey k : selected) {
                             if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                 SocketChannel sc = ((ServerSocketChannel) 
k.channel()).accept();
+
                                 if (sc != null) {
                                     HAService.log.info("HAService receive new 
connection, "
                                             + 
sc.socket().getRemoteSocketAddress());
@@ -234,16 +243,15 @@ public class HAService {
 
                         selected.clear();
                     }
-
                 } catch (Exception e) {
                     log.error(this.getServiceName() + " service has 
exception.", e);
                 }
             }
 
-            log.error(this.getServiceName() + " service end");
+            log.info(this.getServiceName() + " service end");
         }
 
-
+        /** {@inheritDoc} */
         @Override
         public String getServiceName() {
             return AcceptSocketService.class.getSimpleName();
@@ -256,8 +264,8 @@ public class HAService {
     class GroupTransferService extends ServiceThread {
 
         private final WaitNotifyObject notifyTransferObject = new 
WaitNotifyObject();
-        private volatile List<GroupCommitRequest> requestsWrite = new 
ArrayList<GroupCommitRequest>();
-        private volatile List<GroupCommitRequest> requestsRead = new 
ArrayList<GroupCommitRequest>();
+        private volatile List<GroupCommitRequest> requestsWrite = new 
ArrayList<>();
+        private volatile List<GroupCommitRequest> requestsRead = new 
ArrayList<>();
 
 
         public void putRequest(final GroupCommitRequest request) {
@@ -333,7 +341,7 @@ public class HAService {
 
     class HAClient extends ServiceThread {
         private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
-        private final AtomicReference<String> masterAddress = new 
AtomicReference<String>();
+        private final AtomicReference<String> masterAddress = new 
AtomicReference<>();
         private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
         private SocketChannel socketChannel;
         private Selector selector;

Reply via email to