ARTEMIS-1342: Support Netty Native KQueue on macOS Add support for KQueue for when server or client runs on macOS. This is inline with the epoll support for linux.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0bc55100 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0bc55100 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0bc55100 Branch: refs/heads/master Commit: 0bc55100592453cbfd2fe1359dcd8d96b347d58b Parents: 687e318 Author: Michael Andre Pearce <michael.andre.pea...@me.com> Authored: Wed Aug 9 17:43:40 2017 +0100 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Aug 9 15:23:16 2017 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/artemis/utils/Env.java | 5 ++ .../core/client/ActiveMQClientLogger.java | 5 ++ .../artemis/core/remoting/impl/netty/Epoll.java | 6 +-- .../core/remoting/impl/netty/KQueue.java | 51 ++++++++++++++++++++ .../remoting/impl/netty/NettyConnector.java | 14 ++++++ .../remoting/impl/netty/TransportConstants.java | 6 +++ .../src/main/resources/features.xml | 1 + .../core/remoting/impl/netty/NettyAcceptor.java | 17 +++++++ .../main/resources/servers/expire/broker.xml | 11 +++-- 9 files changed, 108 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java index 94f69d3..cd41bef 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java @@ -61,6 +61,7 @@ public final class Env { private static final String OS = System.getProperty("os.name").toLowerCase(); private static final boolean IS_LINUX = OS.startsWith("linux"); + private static final boolean IS_MAC = OS.startsWith("mac"); private static final boolean IS_64BIT = checkIs64bit(); private Env() { @@ -87,6 +88,10 @@ public final class Env { return IS_LINUX == true; } + public static boolean isMacOs() { + return IS_MAC == true; + } + public static boolean is64BitJvm() { return IS_64BIT; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 6fbb911..9814d88 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -536,4 +536,9 @@ public interface ActiveMQClientLogger extends BasicLogger { @Message(id = 214033, value = "Cannot resolve host ", format = Message.Format.MESSAGE_FORMAT) void unableToResolveHost(@Cause UnknownHostException e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 214034, value = "Unable to check KQueue availability ", + format = Message.Format.MESSAGE_FORMAT) + void unableToCheckKQueueAvailability(@Cause Throwable e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java index 8553d7f..8779a5d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java @@ -25,9 +25,9 @@ import org.apache.activemq.artemis.utils.Env; */ public final class Epoll { - private static final boolean IS_AVAILABLE_EPOLL = isIsAvailableEpoll(); + private static final boolean IS_EPOLL_AVAILABLE = isEpollAvailable(); - private static boolean isIsAvailableEpoll() { + private static boolean isEpollAvailable() { try { if (Env.is64BitJvm() && Env.isLinuxOs()) { return io.netty.channel.epoll.Epoll.isAvailable(); @@ -46,6 +46,6 @@ public final class Epoll { } public static boolean isAvailable() { - return IS_AVAILABLE_EPOLL; + return IS_EPOLL_AVAILABLE; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java new file mode 100644 index 0000000..d2adae3 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.utils.Env; + +/** + * Tells if <a href="http://netty.io/wiki/native-transports.html">{@code netty-transport-native-kqueue}</a> is supported. + */ +public final class KQueue { + + private static final boolean IS_KQUEUE_AVAILABLE = isKQueueAvailable(); + + private static boolean isKQueueAvailable() { + try { + if (Env.is64BitJvm() && Env.isMacOs()) { + return io.netty.channel.kqueue.KQueue.isAvailable(); + } else { + return false; + } + } catch (Throwable e) { + ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e); + return false; + } + + } + + private KQueue() { + + } + + public static boolean isAvailable() { + return IS_KQUEUE_AVAILABLE; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 8e48cf9..aaf0b08 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -65,6 +65,8 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.base64.Base64; @@ -232,6 +234,8 @@ public class NettyConnector extends AbstractConnector { private boolean useEpoll; + private boolean useKQueue; + private int remotingThreads; private boolean useGlobalWorkerPool; @@ -309,6 +313,7 @@ public class NettyConnector extends AbstractConnector { useGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME, useGlobalWorkerPool, configuration); useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); @@ -415,6 +420,15 @@ public class NettyConnector extends AbstractConnector { channelClazz = EpollSocketChannel.class; logger.debug("Connector " + this + " using native epoll"); + } else if (useKQueue && KQueue.isAvailable()) { + if (useGlobalWorkerPool) { + group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); + } else { + group = new KQueueEventLoopGroup(remotingThreads); + } + + channelClazz = KQueueSocketChannel.class; + logger.debug("Connector " + this + " using native kqueue"); } else { if (useGlobalWorkerPool) { channelClazz = NioSocketChannel.class; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 5288f38..646de80 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -53,6 +53,8 @@ public class TransportConstants { public static final String USE_EPOLL_PROP_NAME = "useEpoll"; + public static final String USE_KQUEUE_PROP_NAME = "useKQueue"; + @Deprecated /** * @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME @@ -157,6 +159,8 @@ public class TransportConstants { public static final boolean DEFAULT_USE_EPOLL = true; + public static final boolean DEFAULT_USE_KQUEUE = true; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -255,6 +259,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); //noinspection deprecation allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); @@ -309,6 +314,7 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index cf5a2a9..bae4a4c 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -36,6 +36,7 @@ <bundle>mvn:io.netty/netty-codec/${netty.version}</bundle> <bundle>mvn:io.netty/netty-handler/${netty.version}</bundle> <bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle> + <bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle> <bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle> </feature> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index b41fc70..2477bfc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -55,6 +55,8 @@ import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalServerChannel; import io.netty.channel.nio.NioEventLoopGroup; @@ -96,6 +98,7 @@ public class NettyAcceptor extends AbstractAcceptor { public static String INVM_ACCEPTOR_TYPE = "IN-VM"; public static String NIO_ACCEPTOR_TYPE = "NIO"; public static String EPOLL_ACCEPTOR_TYPE = "EPOLL"; + public static String KQUEUE_ACCEPTOR_TYPE = "KQUEUE"; static { // Disable default Netty leak detection if the Netty leak detection level system properties are not in use @@ -130,6 +133,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useEpoll; + private final boolean useKQueue; + private final ProtocolHandler protocolHandler; private final String host; @@ -228,6 +233,7 @@ public class NettyAcceptor extends AbstractAcceptor { remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME, remotingThreads, configuration); useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -318,6 +324,17 @@ public class NettyAcceptor extends AbstractAcceptor { acceptorType = EPOLL_ACCEPTOR_TYPE; logger.debug("Acceptor using native epoll"); + } else if (useKQueue && KQueue.isAvailable()) { + channelClazz = KQueueServerSocketChannel.class; + eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + })); + acceptorType = KQUEUE_ACCEPTOR_TYPE; + + logger.debug("Acceptor using native kqueue"); } else { channelClazz = NioServerSocketChannel.class; eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bc55100/tests/smoke-tests/src/main/resources/servers/expire/broker.xml ---------------------------------------------------------------------- diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml index a4176f8..0930296 100644 --- a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml @@ -98,23 +98,24 @@ under the License. <acceptors> <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it --> + <!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it --> <!-- amqpCredits: The number of credits sent to AMQP producers --> <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark --> <!-- Acceptor for every supported protocol --> - <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> - <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> + <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <!-- STOMP Acceptor. --> - <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> + <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true</acceptor> <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> - <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor> + <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true</acceptor> <!-- MQTT Acceptor --> - <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor> + <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true</acceptor> </acceptors>