Repository: incubator-rocketmq Updated Branches: refs/heads/master b85645996 -> e5892e164 (forced update)
[ROCKETMQ-13] Wrong log level for AcceptSocketService termination. Additionally, added code comments and did a cleanup. JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-13 Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/fed09763 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/fed09763 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/fed09763 Branch: refs/heads/master Commit: fed09763bccb73696d1953937b8b1eb1ce88b131 Parents: 626990c Author: shtykh_roman <rsht...@yahoo.com> Authored: Mon Dec 26 12:17:53 2016 +0900 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Mon Dec 26 15:12:41 2016 +0800 ---------------------------------------------------------------------- .../alibaba/rocketmq/store/ha/HAService.java | 26 +++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/fed09763/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java index 075252c..2cf695c 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java @@ -46,7 +46,7 @@ public class HAService { private final AtomicInteger connectionCount = new AtomicInteger(0); - private final List<HAConnection> connectionList = new LinkedList<HAConnection>(); + private final List<HAConnection> connectionList = new LinkedList<>(); private final AcceptSocketService acceptSocketService; @@ -170,17 +170,22 @@ public class HAService { return push2SlaveMaxOffset; } + /** + * Listens to slave connections to create {@link HAConnection}. + */ class AcceptSocketService extends ServiceThread { private ServerSocketChannel serverSocketChannel; private Selector selector; private final SocketAddress socketAddressListen; - public AcceptSocketService(final int port) { this.socketAddressListen = new InetSocketAddress(port); } - + /** + * Starts listening to slave connections. + * @throws Exception If fails. + */ public void beginAccept() throws Exception { this.serverSocketChannel = ServerSocketChannel.open(); this.selector = RemotingUtil.openSelector(); @@ -190,6 +195,7 @@ public class HAService { this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } + /** {@inheritDoc} */ @Override public void shutdown(final boolean interrupt) { super.shutdown(interrupt); @@ -202,6 +208,7 @@ public class HAService { } } + /** {@inheritDoc} */ @Override public void run() { log.info(this.getServiceName() + " service started"); @@ -210,10 +217,12 @@ public class HAService { try { this.selector.select(1000); Set<SelectionKey> selected = this.selector.selectedKeys(); + if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); + if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); @@ -234,16 +243,15 @@ public class HAService { selected.clear(); } - } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } } - log.error(this.getServiceName() + " service end"); + log.info(this.getServiceName() + " service end"); } - + /** {@inheritDoc} */ @Override public String getServiceName() { return AcceptSocketService.class.getSimpleName(); @@ -256,8 +264,8 @@ public class HAService { class GroupTransferService extends ServiceThread { private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); - private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); - private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); + private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>(); + private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>(); public void putRequest(final GroupCommitRequest request) { @@ -333,7 +341,7 @@ public class HAService { class HAClient extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; - private final AtomicReference<String> masterAddress = new AtomicReference<String>(); + private final AtomicReference<String> masterAddress = new AtomicReference<>(); private final ByteBuffer reportOffset = ByteBuffer.allocate(8); private SocketChannel socketChannel; private Selector selector;