http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java index c82cbdf..9cccaaf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java @@ -6,20 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Base class for background thread * @@ -32,20 +31,16 @@ public abstract class ServiceThread implements Runnable { protected volatile boolean hasNotified = false; protected volatile boolean stopped = false; - public ServiceThread() { this.thread = new Thread(this, this.getServiceName()); } - public abstract String getServiceName(); - public void start() { this.thread.start(); } - public void shutdown() { this.shutdown(false); } @@ -69,7 +64,7 @@ public abstract class ServiceThread implements Runnable { this.thread.join(this.getJointime()); long eclipseTime = System.currentTimeMillis() - beginTime; STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " - + this.getJointime()); + + this.getJointime()); } catch (InterruptedException e) { e.printStackTrace(); }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java index 72c5287..62a10a6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.exception; public class RemotingCommandException extends RemotingException { private static final long serialVersionUID = -6061365915274953096L; - public RemotingCommandException(String message) { super(message, null); } - public RemotingCommandException(String message, Throwable cause) { super(message, cause); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java index 2fa4d69..c3e4777 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.exception; public class RemotingConnectException extends RemotingException { private static final long serialVersionUID = -5565366231695911316L; - public RemotingConnectException(String addr) { this(addr, null); } - public RemotingConnectException(String addr, Throwable cause) { super("connect to <" + addr + "> failed", cause); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java index f4a79ea..cbc363b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.exception; public class RemotingException extends Exception { private static final long serialVersionUID = -5690687334570505110L; - public RemotingException(String message) { super(message); } - public RemotingException(String message, Throwable cause) { super(message, cause); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java index 720ec1f..4eb1b63 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.exception; public class RemotingSendRequestException extends RemotingException { private static final long serialVersionUID = 5391285827332471674L; - public RemotingSendRequestException(String addr) { this(addr, null); } - public RemotingSendRequestException(String addr, Throwable cause) { super("send request to <" + addr + "> failed", cause); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java index 1190b49..e4cc69e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.exception; @@ -20,17 +20,14 @@ public class RemotingTimeoutException extends RemotingException { private static final long serialVersionUID = 4106899185095245979L; - public RemotingTimeoutException(String message) { super(message); } - public RemotingTimeoutException(String addr, long timeoutMillis) { this(addr, timeoutMillis, null); } - public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) { super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java index 80d4418..8ec5cf6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java @@ -6,20 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.exception; public class RemotingTooMuchRequestException extends RemotingException { private static final long serialVersionUID = 4326919581254519654L; - public RemotingTooMuchRequestException(String message) { super(message); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java index b797272..7c017eb 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; @@ -53,97 +53,78 @@ public class NettyClientConfig { return clientWorkerThreads; } - public void setClientWorkerThreads(int clientWorkerThreads) { this.clientWorkerThreads = clientWorkerThreads; } - public int getClientOnewaySemaphoreValue() { return clientOnewaySemaphoreValue; } - public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) { this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue; } - public int getConnectTimeoutMillis() { return connectTimeoutMillis; } - public void setConnectTimeoutMillis(int connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; } - public int getClientCallbackExecutorThreads() { return clientCallbackExecutorThreads; } - public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) { this.clientCallbackExecutorThreads = clientCallbackExecutorThreads; } - public long getChannelNotActiveInterval() { return channelNotActiveInterval; } - public void setChannelNotActiveInterval(long channelNotActiveInterval) { this.channelNotActiveInterval = channelNotActiveInterval; } - public int getClientAsyncSemaphoreValue() { return clientAsyncSemaphoreValue; } - public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) { this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue; } - public int getClientChannelMaxIdleTimeSeconds() { return clientChannelMaxIdleTimeSeconds; } - public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) { this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds; } - public int getClientSocketSndBufSize() { return clientSocketSndBufSize; } - public void setClientSocketSndBufSize(int clientSocketSndBufSize) { this.clientSocketSndBufSize = clientSocketSndBufSize; } - public int getClientSocketRcvBufSize() { return clientSocketRcvBufSize; } - public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) { this.clientSocketRcvBufSize = clientSocketRcvBufSize; } - public boolean isClientPooledByteBufAllocatorEnable() { return clientPooledByteBufAllocatorEnable; } - public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) { this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 0a8ba97..73d7f2b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -6,47 +6,43 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import java.nio.ByteBuffer; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - - /** * */ public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int FRAME_MAX_LENGTH = // - Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); - + Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } - @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { - frame = (ByteBuf) super.decode(ctx, in); + frame = (ByteBuf)super.decode(ctx, in); if (null == frame) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java index 35adcf2..fdebcdc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java @@ -6,28 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import java.nio.ByteBuffer; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - - /** * */ @@ -36,7 +34,7 @@ public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) - throws Exception { + throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java index e086409..825d1da 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java @@ -6,47 +6,41 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; import io.netty.channel.Channel; - public class NettyEvent { private final NettyEventType type; private final String remoteAddr; private final Channel channel; - public NettyEvent(NettyEventType type, String remoteAddr, Channel channel) { this.type = type; this.remoteAddr = remoteAddr; this.channel = channel; } - public NettyEventType getType() { return type; } - public String getRemoteAddr() { return remoteAddr; } - public Channel getChannel() { return channel; } - @Override public String toString() { return "NettyEvent [type=" + type + ", remoteAddr=" + remoteAddr + ", channel=" + channel + "]"; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java index ae4b647..b2135da 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 1034dd8..c0136d3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -16,25 +16,10 @@ */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.ChannelEventListener; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.Pair; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; -import org.apache.rocketmq.remoting.common.ServiceThread; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.HashMap; import java.util.Iterator; @@ -47,28 +32,37 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.common.Pair; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import org.apache.rocketmq.remoting.common.ServiceThread; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class NettyRemotingAbstract { private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); - protected final Semaphore semaphoreOneway; - protected final Semaphore semaphoreAsync; - protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = - new ConcurrentHashMap<Integer, ResponseFuture>(256); + new ConcurrentHashMap<Integer, ResponseFuture>(256); protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = - new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); + new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter(); protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; - public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { this.semaphoreOneway = new Semaphore(permitsOneway, true); this.semaphoreAsync = new Semaphore(permitsAsync, true); @@ -133,14 +127,14 @@ public abstract class NettyRemotingAbstract { } } catch (Throwable e) { if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException" - .equals(e.getClass().getCanonicalName())) { + .equals(e.getClass().getCanonicalName())) { PLOG.error("process request exception", e); PLOG.error(cmd.toString()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // - RemotingHelper.exceptionSimpleDesc(e)); + RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } @@ -150,7 +144,7 @@ public abstract class NettyRemotingAbstract { if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, - "[REJECTREQUEST]system busy, start flow control for a while"); + "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; @@ -162,14 +156,14 @@ public abstract class NettyRemotingAbstract { } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // - + ", too many requests and system thread pool busy, RejectedExecutionException " // - + pair.getObject2().toString() // - + " request code: " + cmd.getCode()); + + ", too many requests and system thread pool busy, RejectedExecutionException " // + + pair.getObject2().toString() // + + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, - "[OVERLOAD]system busy, start flow control for a while"); + "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } @@ -177,7 +171,7 @@ public abstract class NettyRemotingAbstract { } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = - RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); + RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); @@ -267,7 +261,7 @@ public abstract class NettyRemotingAbstract { } public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) - throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { @@ -295,7 +289,7 @@ public abstract class NettyRemotingAbstract { if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, - responseFuture.getCause()); + responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } @@ -308,8 +302,8 @@ public abstract class NettyRemotingAbstract { } public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, - final InvokeCallback invokeCallback) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + final InvokeCallback invokeCallback) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { @@ -348,18 +342,18 @@ public abstract class NettyRemotingAbstract { } } else { String info = - String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // - timeoutMillis, // - this.semaphoreAsync.getQueueLength(), // - this.semaphoreAsync.availablePermits()// - ); + String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // + timeoutMillis, // + this.semaphoreAsync.getQueueLength(), // + this.semaphoreAsync.availablePermits()// + ); PLOG.warn(info); throw new RemotingTooMuchRequestException(info); } } public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { @@ -384,10 +378,10 @@ public abstract class NettyRemotingAbstract { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( - "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // - timeoutMillis, // - this.semaphoreOneway.getQueueLength(), // - this.semaphoreOneway.availablePermits()// + "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // + timeoutMillis, // + this.semaphoreOneway.getQueueLength(), // + this.semaphoreOneway.availablePermits()// ); PLOG.warn(info); throw new RemotingTimeoutException(info); @@ -399,7 +393,6 @@ public abstract class NettyRemotingAbstract { private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>(); private final int maxSize = 10000; - public void putNettyEvent(final NettyEvent event) { if (this.eventQueue.size() <= maxSize) { this.eventQueue.add(event); @@ -408,7 +401,6 @@ public abstract class NettyRemotingAbstract { } } - @Override public void run() { PLOG.info(this.getServiceName() + " service started"); @@ -445,7 +437,6 @@ public abstract class NettyRemotingAbstract { PLOG.info(this.getServiceName() + " service end"); } - @Override public String getServiceName() { return NettyEventExecuter.class.getSimpleName(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 3b7013a..db7815a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -16,18 +16,6 @@ */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.ChannelEventListener; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.common.Pair; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -45,9 +33,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.Collections; import java.util.List; @@ -64,7 +49,20 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.common.Pair; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @@ -94,7 +92,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } public NettyRemotingClient(final NettyClientConfig nettyClientConfig, // - final ChannelEventListener channelEventListener) { + final ChannelEventListener channelEventListener) { super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); this.nettyClientConfig = nettyClientConfig; this.channelEventListener = channelEventListener; @@ -107,7 +105,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); - @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet()); @@ -117,7 +114,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); - @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); @@ -134,36 +130,35 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// - nettyClientConfig.getClientWorkerThreads(), // - new ThreadFactory() { + nettyClientConfig.getClientWorkerThreads(), // + new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); - } - }); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); + } + }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, false) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) - .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) - .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - defaultEventExecutorGroup, - new NettyEncoder(), - new NettyDecoder(), - new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), - new NettyConnetManageHandler(), - new NettyClientHandler()); - } - }); + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) + .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) + .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), + new NettyConnetManageHandler(), + new NettyClientHandler()); + } + }); this.timer.scheduleAtFixedRate(new TimerTask() { @Override @@ -233,7 +228,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti removeItemFromTable = false; } else if (prevCW.getChannel() != channel) { log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.", - addrRemote); + addrRemote); removeItemFromTable = false; } @@ -338,7 +333,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) - throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { + throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { @@ -431,7 +426,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return cw.getChannel(); } - if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { boolean createNewConnection = false; @@ -476,7 +470,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), - channelFuture.toString()); + channelFuture.toString()); } } @@ -485,8 +479,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) - throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, - RemotingSendRequestException { + throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, + RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { @@ -507,7 +501,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, - RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { @@ -572,27 +566,22 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti static class ChannelWrapper { private final ChannelFuture channelFuture; - public ChannelWrapper(ChannelFuture channelFuture) { this.channelFuture = channelFuture; } - public boolean isOK() { return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); } - public boolean isWriteable() { return this.channelFuture.channel().isWritable(); } - private Channel getChannel() { return this.channelFuture.channel(); } - public ChannelFuture getChannelFuture() { return channelFuture; } @@ -610,7 +599,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti class NettyConnetManageHandler extends ChannelDuplexHandler { @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) - throws Exception { + throws Exception { final String local = localAddress == null ? "UNKNOW" : localAddress.toString(); final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString(); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); @@ -621,7 +610,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -634,7 +622,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -650,14 +637,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent evnet = (IdleStateEvent) evt; + IdleStateEvent evnet = (IdleStateEvent)evt; if (evnet.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress); closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this - .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index c6e2eda..f109086 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -6,27 +6,16 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.ChannelEventListener; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.RemotingServer; -import org.apache.rocketmq.remoting.common.Pair; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; @@ -45,9 +34,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.InetSocketAddress; import java.util.Timer; import java.util.TimerTask; @@ -55,7 +41,19 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.common.Pair; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @@ -72,15 +70,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private RPCHook rpcHook; - private int port = 0; - public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } - public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); @@ -95,7 +90,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); - @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); @@ -105,7 +99,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); - @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); @@ -113,12 +106,11 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti }); if (RemotingUtil.isLinuxPlatform() // - && nettyServerConfig.isUseEpollNativeSelector()) { + && nettyServerConfig.isUseEpollNativeSelector()) { this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); - @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); @@ -129,7 +121,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); - @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); @@ -138,51 +129,49 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } - @Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// - nettyServerConfig.getServerWorkerThreads(), // - new ThreadFactory() { + nettyServerConfig.getServerWorkerThreads(), // + new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private AtomicInteger threadIndex = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); + } + }); + ServerBootstrap childHandler = + this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .childOption(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) + .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) + .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) + .childHandler(new ChannelInitializer<SocketChannel>() { @Override - public Thread newThread(Runnable r) { - return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new NettyConnetManageHandler(), + new NettyServerHandler()); } }); - ServerBootstrap childHandler = - this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 1024) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.SO_KEEPALIVE, false) - .childOption(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) - .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) - .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) - .childHandler(new ChannelInitializer<SocketChannel>() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - defaultEventExecutorGroup, - new NettyEncoder(), - new NettyDecoder(), - new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnetManageHandler(), - new NettyServerHandler()); - } - }); - if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); - InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); + InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); @@ -269,19 +258,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) - throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { return this.invokeSyncImpl(channel, request, timeoutMillis); } @Override public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); } @Override public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException, - RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeOnewayImpl(channel, request, timeoutMillis); } @@ -316,7 +305,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti super.channelRegistered(ctx); } - @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -324,7 +312,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti super.channelUnregistered(ctx); } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -336,7 +323,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } - @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -348,18 +334,17 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } - @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent evnet = (IdleStateEvent) evt; + IdleStateEvent evnet = (IdleStateEvent)evt; if (evnet.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this - .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); } } } @@ -367,7 +352,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ctx.fireUserEventTriggered(evt); } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index b8d2052..c6251e9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.channel.ChannelHandlerContext; - +import org.apache.rocketmq.remoting.protocol.RemotingCommand; /** * Common remoting command processor @@ -27,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext; */ public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws Exception; + throws Exception; + boolean rejectRequest(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index 0a53240..f69fded 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; @@ -43,118 +43,96 @@ public class NettyServerConfig implements Cloneable { */ private boolean useEpollNativeSelector = false; - public int getListenPort() { return listenPort; } - public void setListenPort(int listenPort) { this.listenPort = listenPort; } - public int getServerWorkerThreads() { return serverWorkerThreads; } - public void setServerWorkerThreads(int serverWorkerThreads) { this.serverWorkerThreads = serverWorkerThreads; } - public int getServerSelectorThreads() { return serverSelectorThreads; } - public void setServerSelectorThreads(int serverSelectorThreads) { this.serverSelectorThreads = serverSelectorThreads; } - public int getServerOnewaySemaphoreValue() { return serverOnewaySemaphoreValue; } - public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) { this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue; } - public int getServerCallbackExecutorThreads() { return serverCallbackExecutorThreads; } - public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) { this.serverCallbackExecutorThreads = serverCallbackExecutorThreads; } - public int getServerAsyncSemaphoreValue() { return serverAsyncSemaphoreValue; } - public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) { this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue; } - public int getServerChannelMaxIdleTimeSeconds() { return serverChannelMaxIdleTimeSeconds; } - public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) { this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds; } - public int getServerSocketSndBufSize() { return serverSocketSndBufSize; } - public void setServerSocketSndBufSize(int serverSocketSndBufSize) { this.serverSocketSndBufSize = serverSocketSndBufSize; } - public int getServerSocketRcvBufSize() { return serverSocketRcvBufSize; } - public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) { this.serverSocketRcvBufSize = serverSocketRcvBufSize; } - public boolean isServerPooledByteBufAllocatorEnable() { return serverPooledByteBufAllocatorEnable; } - public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) { this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable; } - public boolean isUseEpollNativeSelector() { return useEpollNativeSelector; } - public void setUseEpollNativeSelector(boolean useEpollNativeSelector) { this.useEpollNativeSelector = useEpollNativeSelector; } @Override public Object clone() throws CloneNotSupportedException { - return (NettyServerConfig) super.clone(); + return (NettyServerConfig)super.clone(); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index ca22df1..9409f92 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -19,24 +19,24 @@ package org.apache.rocketmq.remoting.netty; public class NettySystemConfig { public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = - "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; + "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = // - "com.rocketmq.remoting.socket.sndbuf.size"; + "com.rocketmq.remoting.socket.sndbuf.size"; public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = // - "com.rocketmq.remoting.socket.rcvbuf.size"; + "com.rocketmq.remoting.socket.rcvbuf.size"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = // - "com.rocketmq.remoting.clientAsyncSemaphoreValue"; + "com.rocketmq.remoting.clientAsyncSemaphoreValue"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = // - "com.rocketmq.remoting.clientOnewaySemaphoreValue"; + "com.rocketmq.remoting.clientOnewaySemaphoreValue"; public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // - Boolean - .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); - public static int socketSndbufSize = // - Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); - public static int socketRcvbufSize = // - Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); + Boolean + .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // - Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = // - Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); + public static int socketSndbufSize = // + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); + public static int socketRcvbufSize = // + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java index e1317a0..0443b43 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java @@ -17,9 +17,8 @@ package org.apache.rocketmq.remoting.netty; - -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import io.netty.channel.Channel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class RequestTask implements Runnable { private final Runnable runnable; @@ -37,7 +36,7 @@ public class RequestTask implements Runnable { @Override public int hashCode() { int result = runnable != null ? runnable.hashCode() : 0; - result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32)); + result = 31 * result + (int)(getCreateTimestamp() ^ (getCreateTimestamp() >>> 32)); result = 31 * result + (channel != null ? channel.hashCode() : 0); result = 31 * result + (request != null ? request.hashCode() : 0); result = 31 * result + (isStopRun() ? 1 : 0); @@ -46,14 +45,19 @@ public class RequestTask implements Runnable { @Override public boolean equals(final Object o) { - if (this == o) return true; - if (!(o instanceof RequestTask)) return false; + if (this == o) + return true; + if (!(o instanceof RequestTask)) + return false; - final RequestTask that = (RequestTask) o; + final RequestTask that = (RequestTask)o; - if (getCreateTimestamp() != that.getCreateTimestamp()) return false; - if (isStopRun() != that.isStopRun()) return false; - if (channel != null ? !channel.equals(that.channel) : that.channel != null) return false; + if (getCreateTimestamp() != that.getCreateTimestamp()) + return false; + if (isStopRun() != that.isStopRun()) + return false; + if (channel != null ? !channel.equals(that.channel) : that.channel != null) + return false; return request != null ? request.getOpaque() == that.request.getOpaque() : that.request == null; } @@ -72,7 +76,8 @@ public class RequestTask implements Runnable { @Override public void run() { - if (!this.stopRun) this.runnable.run(); + if (!this.stopRun) + this.runnable.run(); } public void returnResponse(int code, String remark) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index d564a3a..fa792b2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -6,24 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class ResponseFuture { private final int opaque; @@ -39,16 +37,14 @@ public class ResponseFuture { private volatile boolean sendRequestOK = true; private volatile Throwable cause; - public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, - SemaphoreReleaseOnlyOnce once) { + SemaphoreReleaseOnlyOnce once) { this.opaque = opaque; this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; this.once = once; } - public void executeInvokeCallback() { if (invokeCallback != null) { if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { @@ -57,87 +53,72 @@ public class ResponseFuture { } } - public void release() { if (this.once != null) { this.once.release(); } } - public boolean isTimeout() { long diff = System.currentTimeMillis() - this.beginTimestamp; return diff > this.timeoutMillis; } - public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } - public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); } - public long getBeginTimestamp() { return beginTimestamp; } - public boolean isSendRequestOK() { return sendRequestOK; } - public void setSendRequestOK(boolean sendRequestOK) { this.sendRequestOK = sendRequestOK; } - public long getTimeoutMillis() { return timeoutMillis; } - public InvokeCallback getInvokeCallback() { return invokeCallback; } - public Throwable getCause() { return cause; } - public void setCause(Throwable cause) { this.cause = cause; } - public RemotingCommand getResponseCommand() { return responseCommand; } - public void setResponseCommand(RemotingCommand responseCommand) { this.responseCommand = responseCommand; } - public int getOpaque() { return opaque; } - @Override public String toString() { return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK - + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis - + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp - + ", countDownLatch=" + countDownLatch + "]"; + + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp + + ", countDownLatch=" + countDownLatch + "]"; } }