[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r246058995 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- I'm working on it. I'm out today on a meeting... will be done tomorrow (Wed) ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r245960681 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- Did you look over this? ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r245777806 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java --- @@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener readyListener) { @Override public void onRemoteOpen(Connection connection) throws Exception { - lock(); + handler.requireHandler(); try { - try { -initInternal(); - } catch (Exception e) { -log.error("Error init connection", e); - } - if (!validateConnection(connection)) { -connection.close(); - } else { -connection.setContext(AMQPConnectionContext.this); -connection.setContainer(containerId); -connection.setProperties(connectionProperties); - connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); -connection.open(); - } - } finally { - unlock(); + initInternal(); + } catch (Exception e) { + log.error("Error init connection", e); + } + if (!validateConnection(connection)) { + connection.close(); + } else { + connection.setContext(AMQPConnectionContext.this); + connection.setContainer(containerId); + connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); + connection.open(); } initialise(); - /* - * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections - * but its here in case we add support for outbound connections. - * */ + /* + * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections + * but its here in case we add support for outbound connections. + * */ if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { long nextKeepAliveTime = handler.tick(true); if (nextKeepAliveTime != 0 && scheduledPool != null) { -scheduledPool.schedule(new Runnable() { - @Override - public void run() { - Long rescheduleAt = handler.tick(false); - if (rescheduleAt == null) { - // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. - scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS); - } else if (rescheduleAt != 0) { - scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); - } - } -}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); +scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); } } } + class TickerRunnable implements Runnable { + + final ScheduleRunnable scheduleRunnable; + + TickerRunnable(ScheduleRunnable scheduleRunnable) { + this.scheduleRunnable = scheduleRunnable; + } + + @Override + public void run() { + try { +Long rescheduleAt = handler.tick(false); +if (rescheduleAt == null) { + // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. + scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS); +} else if (rescheduleAt != 0) { + scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); +} + } catch (Exception e) { +log.warn(e.getMessage(), e); + } --- End diff -- I'm removing the catch here. I don't think we need to use specific loggers on generic handlers like this though. it was a generic handler.. but it's being removed. ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242970329 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java --- @@ -0,0 +1,221 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton.handler; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** Test cases may supply a simple executor instead of the real Netty Executor + * On that case this is a simple adapter for what's needed from these tests. + * Not intended to be used in production. + * + * TODO: This could be refactored out of the main codebase but at a high cost. + *We may do it some day if we find an easy way that won't clutter the code too much. + * */ +public class ExecutorNettyAdapter implements EventLoop { --- End diff -- If you would add a comment on the path where is being used it would help (me that I have a bad memory) to remember it mate! ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242969136 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java --- @@ -0,0 +1,221 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton.handler; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** Test cases may supply a simple executor instead of the real Netty Executor + * On that case this is a simple adapter for what's needed from these tests. + * Not intended to be used in production. + * + * TODO: This could be refactored out of the main codebase but at a high cost. + *We may do it some day if we find an easy way that won't clutter the code too much. + * */ +public class ExecutorNettyAdapter implements EventLoop { --- End diff -- @franz1981 that's correct. Placing it on the testsuite only would require some work that I do not want to go through. ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242968808 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- ouch... let me see ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242903263 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java --- @@ -0,0 +1,221 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton.handler; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** Test cases may supply a simple executor instead of the real Netty Executor + * On that case this is a simple adapter for what's needed from these tests. + * Not intended to be used in production. + * + * TODO: This could be refactored out of the main codebase but at a high cost. + *We may do it some day if we find an easy way that won't clutter the code too much. + * */ +public class ExecutorNettyAdapter implements EventLoop { --- End diff -- I see that ExecutorNettyAdapter is used on AMQPConnectionContext: do you mean that only tests trigger ExecutorNettyAdapter to be created? ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242788099 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -469,20 +530,17 @@ public void close(ErrorCondition condition) throws ActiveMQAMQPException { sender.setCondition(condition); } protonSession.removeSender(sender); - connection.lock(); - try { - sender.close(); - } finally { - connection.unlock(); - } - connection.flush(); - try { - sessionSPI.closeSender(brokerConsumer); - } catch (Exception e) { - log.warn(e.getMessage(), e); - throw new ActiveMQAMQPInternalErrorException(e.getMessage()); - } + connection.runLater(() -> { + sender.close(); + try { +sessionSPI.closeSender(brokerConsumer); + } catch (Exception e) { +log.warn(e.getMessage(), e); --- End diff -- Static logger method needed ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242788013 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); --- End diff -- Static logger method needed ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787638 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java --- @@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener readyListener) { @Override public void onRemoteOpen(Connection connection) throws Exception { - lock(); + handler.requireHandler(); try { - try { -initInternal(); - } catch (Exception e) { -log.error("Error init connection", e); - } - if (!validateConnection(connection)) { -connection.close(); - } else { -connection.setContext(AMQPConnectionContext.this); -connection.setContainer(containerId); -connection.setProperties(connectionProperties); - connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); -connection.open(); - } - } finally { - unlock(); + initInternal(); + } catch (Exception e) { + log.error("Error init connection", e); + } + if (!validateConnection(connection)) { + connection.close(); + } else { + connection.setContext(AMQPConnectionContext.this); + connection.setContainer(containerId); + connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); + connection.open(); } initialise(); - /* - * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections - * but its here in case we add support for outbound connections. - * */ + /* + * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections + * but its here in case we add support for outbound connections. + * */ if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { long nextKeepAliveTime = handler.tick(true); if (nextKeepAliveTime != 0 && scheduledPool != null) { -scheduledPool.schedule(new Runnable() { - @Override - public void run() { - Long rescheduleAt = handler.tick(false); - if (rescheduleAt == null) { - // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. - scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS); - } else if (rescheduleAt != 0) { - scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); - } - } -}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); +scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); } } } + class TickerRunnable implements Runnable { + + final ScheduleRunnable scheduleRunnable; + + TickerRunnable(ScheduleRunnable scheduleRunnable) { + this.scheduleRunnable = scheduleRunnable; + } + + @Override + public void run() { + try { +Long rescheduleAt = handler.tick(false); +if (rescheduleAt == null) { + // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. + scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS); +} else if (rescheduleAt != 0) { + scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); +} + } catch (Exception e) { +log.warn(e.getMessage(), e); + } --- End diff -- This should have a static logger method with a code ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787783 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -122,7 +136,53 @@ public Object getBrokerConsumer() { @Override public void onFlow(int currentCredits, boolean drain) { - sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); + connection.requireInHandler(); + + setupCredit(); + + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer; + if (drain) { + // If the draining is already running, then don't do anything + if (draining.compareAndSet(false, true)) { +final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext(); +serverConsumer.forceDelivery(1, new Runnable() { + @Override + public void run() { + try { + connection.runNow(() -> { +plugSender.reportDrained(); +setupCredit(); + }); + } finally { + draining.set(false); + } + } +}); + } + } else { + serverConsumer.receiveCredits(-1); + } + } + + public boolean hasCredits() { + if (!connection.flowControl(onflowControlReady)) { + return false; + } + + //return true; + //return getSender().getCredit() > 0; --- End diff -- Remove commented out code ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242788241 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- Looks like some work left todo here ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787574 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java --- @@ -471,40 +494,42 @@ public void onFlow(Link link) throws Exception { @Override public void onRemoteClose(Link link) throws Exception { - lock(); - try { + handler.requireHandler(); + + // We scheduled it for later, as that will work through anything that's pending on the current deliveries. + runLater(() -> { link.close(); link.free(); - } finally { - unlock(); - } - ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); - if (linkContext != null) { - linkContext.close(true); - } + ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); + if (linkContext != null) { +try { + linkContext.close(true); +} catch (Exception e) { + log.error(e.getMessage(), e); --- End diff -- this should have a static logger method ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242745363 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java --- @@ -159,7 +160,7 @@ private final SimpleString managementAddress; - protected final RoutingContext routingContext = new RoutingContextImpl(null); + protected final RoutingContext _routingContext; --- End diff -- @franz1981 I reverted the change that's not needed any longer. all amended. ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242727332 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java --- @@ -159,7 +160,7 @@ private final SimpleString managementAddress; - protected final RoutingContext routingContext = new RoutingContextImpl(null); + protected final RoutingContext _routingContext; --- End diff -- @franz1981 I don't follow you ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242727059 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java --- @@ -0,0 +1,221 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton.handler; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** Test cases may supply a simple executor instead of the real Netty Executor + * On that case this is a simple adapter for what's needed from these tests. + * Not intended to be used in production. + * + * TODO: This could be refactored out of the main codebase but at a high cost. + *We may do it some day if we find an easy way that won't clutter the code too much. + * */ +public class ExecutorNettyAdapter implements EventLoop { --- End diff -- @franz1981 this is basically for unit testing, I did not want to invest a lot of code on it. ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242725348 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java --- @@ -119,10 +119,15 @@ public boolean isDestroyed() { @Override public void disconnect(boolean criticalError) { - ErrorCondition errorCondition = new ErrorCondition(); - errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); - amqpConnection.close(errorCondition); - getTransportConnection().close(); + amqpConnection.runLater(() -> { + ErrorCondition errorCondition = new ErrorCondition(); + errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); + amqpConnection.close(errorCondition); + amqpConnection.flush(); + }); + amqpConnection.runLater(() -> { --- End diff -- @franz1981 During my work, the first one was runNow, the second Later. (I would accept better names BTW) later I changed the first to later and forgot the other one :) fixing it on my next ammend ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242700956 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java --- @@ -159,7 +160,7 @@ private final SimpleString managementAddress; - protected final RoutingContext routingContext = new RoutingContextImpl(null); + protected final RoutingContext _routingContext; --- End diff -- we can use just `routingContext` name, dropping `_` ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242702260 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java --- @@ -0,0 +1,221 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton.handler; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** Test cases may supply a simple executor instead of the real Netty Executor + * On that case this is a simple adapter for what's needed from these tests. + * Not intended to be used in production. + * + * TODO: This could be refactored out of the main codebase but at a high cost. + *We may do it some day if we find an easy way that won't clutter the code too much. + * */ +public class ExecutorNettyAdapter implements EventLoop { --- End diff -- Instead of using `ExecutorNettyAdapter` we couldn't reuse `OrderedExecutor` interface adding `inEventLoop()` on it? `OrderedExecutor::inEventLoop()` could just be `== inHandler()` ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242686500 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java --- @@ -119,10 +119,15 @@ public boolean isDestroyed() { @Override public void disconnect(boolean criticalError) { - ErrorCondition errorCondition = new ErrorCondition(); - errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); - amqpConnection.close(errorCondition); - getTransportConnection().close(); + amqpConnection.runLater(() -> { + ErrorCondition errorCondition = new ErrorCondition(); + errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); + amqpConnection.close(errorCondition); + amqpConnection.flush(); + }); + amqpConnection.runLater(() -> { --- End diff -- Probably it is a naive question, but why 2 separate runLater calls instead of one? ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
GitHub user clebertsuconic opened a pull request: https://github.com/apache/activemq-artemis/pull/2467 ARTEMIS-2205 Performance improvements on AMQP and other parts You can merge this pull request into a Git repository by running: $ git pull https://github.com/clebertsuconic/activemq-artemis amqp-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2467.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2467 commit 3199e32d2c98e7cdfd9ef157b9ea0cb19ca85e75 Author: Clebert Suconic Date: 2018-12-17T14:11:54Z ARTEMIS-2205 Refactor AMQP Processing into Single Threaded (per connection) https://issues.apache.org/jira/browse/ARTEMIS-2205 commit a72c73fad3b8b4ca03737c6733d018dbd385bfbd Author: Clebert Suconic Date: 2018-12-17T14:12:07Z ARTEMIS-2205 Broker Improvement by caching Routing for most common cases During AMQP Perf tests this became more relevant on profiling. It is a general improvement done as part of this AMQP performance task. commit 6d93d0aff908c23e68a135041d663c3c39701c72 Author: Clebert Suconic Date: 2018-12-17T14:12:14Z ARTEMIS-2205 Avoid new Runnable for every message sent As we now use the netty executor, creating a new Runnable for every message received became a relevant CPU, memory and putting extra GC pressure. By doing this change alone I was able to improve performance by 5% more or less. https://issues.apache.org/jira/browse/ARTEMIS-2205 commit c11605e00737f570ee4eddc62d9f2bacebcdfb2b Author: Francesco Nigro Date: 2018-12-17T14:12:19Z ARTEMIS-2205 Optimizing some Lambda usages https://issues.apache.org/jira/browse/ARTEMIS-2205 ---