[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r246058995
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

I'm working on it. I'm out today on a meeting... will be done tomorrow (Wed)


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r245960681
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

Did you look over this? 


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-07 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r245777806
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 ---
@@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener 
readyListener) {
 
@Override
public void onRemoteOpen(Connection connection) throws Exception {
-  lock();
+  handler.requireHandler();
   try {
- try {
-initInternal();
- } catch (Exception e) {
-log.error("Error init connection", e);
- }
- if (!validateConnection(connection)) {
-connection.close();
- } else {
-connection.setContext(AMQPConnectionContext.this);
-connection.setContainer(containerId);
-connection.setProperties(connectionProperties);
-
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-connection.open();
- }
-  } finally {
- unlock();
+ initInternal();
+  } catch (Exception e) {
+ log.error("Error init connection", e);
+  }
+  if (!validateConnection(connection)) {
+ connection.close();
+  } else {
+ connection.setContext(AMQPConnectionContext.this);
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+ 
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.open();
   }
   initialise();
 
- /*
- * This can be null which is in effect an empty map, also we 
really don't need to check this for in bound connections
- * but its here in case we add support for outbound connections.
- * */
+  /*
+  * This can be null which is in effect an empty map, also we really 
don't need to check this for in bound connections
+  * but its here in case we add support for outbound connections.
+  * */
   if (connection.getRemoteProperties() == null || 
!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
  long nextKeepAliveTime = handler.tick(true);
  if (nextKeepAliveTime != 0 && scheduledPool != null) {
-scheduledPool.schedule(new Runnable() {
-   @Override
-   public void run() {
-  Long rescheduleAt = handler.tick(false);
-  if (rescheduleAt == null) {
- // this mean tick could not acquire a lock, we will 
just retry in 10 milliseconds.
- scheduledPool.schedule(this, 10, 
TimeUnit.MILLISECONDS);
-  } else if (rescheduleAt != 0) {
- scheduledPool.schedule(this, rescheduleAt - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
-  }
-   }
-}, (nextKeepAliveTime - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+scheduledPool.schedule(new ScheduleRunnable(), 
(nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), 
TimeUnit.MILLISECONDS);
  }
   }
}
 
+   class TickerRunnable implements Runnable {
+
+  final ScheduleRunnable scheduleRunnable;
+
+  TickerRunnable(ScheduleRunnable scheduleRunnable) {
+ this.scheduleRunnable = scheduleRunnable;
+  }
+
+  @Override
+  public void run() {
+ try {
+Long rescheduleAt = handler.tick(false);
+if (rescheduleAt == null) {
+   // this mean tick could not acquire a lock, we will just 
retry in 10 milliseconds.
+   scheduledPool.schedule(scheduleRunnable, 10, 
TimeUnit.MILLISECONDS);
+} else if (rescheduleAt != 0) {
+   scheduledPool.schedule(scheduleRunnable, rescheduleAt - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
+}
+ } catch (Exception e) {
+log.warn(e.getMessage(), e);
+ }
--- End diff --

I'm removing the catch here. I don't think we need to use specific loggers 
on generic handlers like this though. it was a generic handler.. but it's being 
removed.


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-19 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242970329
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty 
Executor
+ *  On that case this is a simple adapter for what's needed from these 
tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high 
cost.
+ *We may do it some day if we find an easy way that won't clutter 
the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
--- End diff --

If you would add a comment on the path where is being used it would help 
(me that I have a bad memory) to remember it mate!


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-19 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242969136
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty 
Executor
+ *  On that case this is a simple adapter for what's needed from these 
tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high 
cost.
+ *We may do it some day if we find an easy way that won't clutter 
the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
--- End diff --

@franz1981 that's correct.

Placing it on the testsuite only would require some work that I do not want 
to go through.


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-19 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242968808
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

ouch... let me see


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-19 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242903263
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty 
Executor
+ *  On that case this is a simple adapter for what's needed from these 
tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high 
cost.
+ *We may do it some day if we find an easy way that won't clutter 
the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
--- End diff --

I see that ExecutorNettyAdapter is used on AMQPConnectionContext: do you 
mean that only tests trigger ExecutorNettyAdapter to be created?


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242788099
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -469,20 +530,17 @@ public void close(ErrorCondition condition) throws 
ActiveMQAMQPException {
  sender.setCondition(condition);
   }
   protonSession.removeSender(sender);
-  connection.lock();
-  try {
- sender.close();
-  } finally {
- connection.unlock();
-  }
-  connection.flush();
 
-  try {
- sessionSPI.closeSender(brokerConsumer);
-  } catch (Exception e) {
- log.warn(e.getMessage(), e);
- throw new ActiveMQAMQPInternalErrorException(e.getMessage());
-  }
+  connection.runLater(() -> {
+ sender.close();
+ try {
+sessionSPI.closeSender(brokerConsumer);
+ } catch (Exception e) {
+log.warn(e.getMessage(), e);
--- End diff --

Static logger method needed


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242788013
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
--- End diff --

Static logger method needed


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787638
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 ---
@@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener 
readyListener) {
 
@Override
public void onRemoteOpen(Connection connection) throws Exception {
-  lock();
+  handler.requireHandler();
   try {
- try {
-initInternal();
- } catch (Exception e) {
-log.error("Error init connection", e);
- }
- if (!validateConnection(connection)) {
-connection.close();
- } else {
-connection.setContext(AMQPConnectionContext.this);
-connection.setContainer(containerId);
-connection.setProperties(connectionProperties);
-
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-connection.open();
- }
-  } finally {
- unlock();
+ initInternal();
+  } catch (Exception e) {
+ log.error("Error init connection", e);
+  }
+  if (!validateConnection(connection)) {
+ connection.close();
+  } else {
+ connection.setContext(AMQPConnectionContext.this);
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+ 
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.open();
   }
   initialise();
 
- /*
- * This can be null which is in effect an empty map, also we 
really don't need to check this for in bound connections
- * but its here in case we add support for outbound connections.
- * */
+  /*
+  * This can be null which is in effect an empty map, also we really 
don't need to check this for in bound connections
+  * but its here in case we add support for outbound connections.
+  * */
   if (connection.getRemoteProperties() == null || 
!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
  long nextKeepAliveTime = handler.tick(true);
  if (nextKeepAliveTime != 0 && scheduledPool != null) {
-scheduledPool.schedule(new Runnable() {
-   @Override
-   public void run() {
-  Long rescheduleAt = handler.tick(false);
-  if (rescheduleAt == null) {
- // this mean tick could not acquire a lock, we will 
just retry in 10 milliseconds.
- scheduledPool.schedule(this, 10, 
TimeUnit.MILLISECONDS);
-  } else if (rescheduleAt != 0) {
- scheduledPool.schedule(this, rescheduleAt - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
-  }
-   }
-}, (nextKeepAliveTime - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+scheduledPool.schedule(new ScheduleRunnable(), 
(nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), 
TimeUnit.MILLISECONDS);
  }
   }
}
 
+   class TickerRunnable implements Runnable {
+
+  final ScheduleRunnable scheduleRunnable;
+
+  TickerRunnable(ScheduleRunnable scheduleRunnable) {
+ this.scheduleRunnable = scheduleRunnable;
+  }
+
+  @Override
+  public void run() {
+ try {
+Long rescheduleAt = handler.tick(false);
+if (rescheduleAt == null) {
+   // this mean tick could not acquire a lock, we will just 
retry in 10 milliseconds.
+   scheduledPool.schedule(scheduleRunnable, 10, 
TimeUnit.MILLISECONDS);
+} else if (rescheduleAt != 0) {
+   scheduledPool.schedule(scheduleRunnable, rescheduleAt - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
+}
+ } catch (Exception e) {
+log.warn(e.getMessage(), e);
+ }
--- End diff --

This should have a static logger method with a code


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787783
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -122,7 +136,53 @@ public Object getBrokerConsumer() {
 
@Override
public void onFlow(int currentCredits, boolean drain) {
-  sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
+  connection.requireInHandler();
+
+  setupCredit();
+
+  ServerConsumerImpl serverConsumer = (ServerConsumerImpl) 
brokerConsumer;
+  if (drain) {
+ // If the draining is already running, then don't do anything
+ if (draining.compareAndSet(false, true)) {
+final ProtonServerSenderContext plugSender = 
(ProtonServerSenderContext) serverConsumer.getProtocolContext();
+serverConsumer.forceDelivery(1, new Runnable() {
+   @Override
+   public void run() {
+  try {
+ connection.runNow(() -> {
+plugSender.reportDrained();
+setupCredit();
+ });
+  } finally {
+ draining.set(false);
+  }
+   }
+});
+ }
+  } else {
+ serverConsumer.receiveCredits(-1);
+  }
+   }
+
+   public boolean hasCredits() {
+  if (!connection.flowControl(onflowControlReady)) {
+ return false;
+  }
+
+  //return true;
+  //return getSender().getCredit() > 0;
--- End diff --

Remove commented out code


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242788241
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

Looks like some work left todo here


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787574
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 ---
@@ -471,40 +494,42 @@ public void onFlow(Link link) throws Exception {
 
@Override
public void onRemoteClose(Link link) throws Exception {
-  lock();
-  try {
+  handler.requireHandler();
+
+  // We scheduled it for later, as that will work through anything 
that's pending on the current deliveries.
+  runLater(() -> {
  link.close();
  link.free();
-  } finally {
- unlock();
-  }
 
-  ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) 
link.getContext();
-  if (linkContext != null) {
- linkContext.close(true);
-  }
+ ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) 
link.getContext();
+ if (linkContext != null) {
+try {
+   linkContext.close(true);
+} catch (Exception e) {
+   log.error(e.getMessage(), e);
--- End diff --

 this should have a static logger method


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242745363
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 ---
@@ -159,7 +160,7 @@
 
private final SimpleString managementAddress;
 
-   protected final RoutingContext routingContext = new 
RoutingContextImpl(null);
+   protected final RoutingContext _routingContext;
--- End diff --

@franz1981 I reverted the change that's not needed any longer. 


all amended.


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242727332
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 ---
@@ -159,7 +160,7 @@
 
private final SimpleString managementAddress;
 
-   protected final RoutingContext routingContext = new 
RoutingContextImpl(null);
+   protected final RoutingContext _routingContext;
--- End diff --

@franz1981 I don't follow you


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242727059
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty 
Executor
+ *  On that case this is a simple adapter for what's needed from these 
tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high 
cost.
+ *We may do it some day if we find an easy way that won't clutter 
the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
--- End diff --

@franz1981 this is basically for unit testing, I did not want to invest a 
lot of code on it.


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242725348
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
 ---
@@ -119,10 +119,15 @@ public boolean isDestroyed() {
 
@Override
public void disconnect(boolean criticalError) {
-  ErrorCondition errorCondition = new ErrorCondition();
-  errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
-  amqpConnection.close(errorCondition);
-  getTransportConnection().close();
+  amqpConnection.runLater(() -> {
+ ErrorCondition errorCondition = new ErrorCondition();
+ errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
+ amqpConnection.close(errorCondition);
+ amqpConnection.flush();
+  });
+  amqpConnection.runLater(() -> {
--- End diff --

@franz1981 During my work, the first one was runNow, the second Later. (I 
would accept better names BTW)

later I changed the first to later and forgot the other one :) 


fixing it on my next ammend


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242700956
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 ---
@@ -159,7 +160,7 @@
 
private final SimpleString managementAddress;
 
-   protected final RoutingContext routingContext = new 
RoutingContextImpl(null);
+   protected final RoutingContext _routingContext;
--- End diff --

we can use just `routingContext` name, dropping `_`


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242702260
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty 
Executor
+ *  On that case this is a simple adapter for what's needed from these 
tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high 
cost.
+ *We may do it some day if we find an easy way that won't clutter 
the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
--- End diff --

Instead of using `ExecutorNettyAdapter` we couldn't reuse `OrderedExecutor` 
interface adding `inEventLoop()` on it?
`OrderedExecutor::inEventLoop()` could just be `== inHandler()`


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-18 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r242686500
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
 ---
@@ -119,10 +119,15 @@ public boolean isDestroyed() {
 
@Override
public void disconnect(boolean criticalError) {
-  ErrorCondition errorCondition = new ErrorCondition();
-  errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
-  amqpConnection.close(errorCondition);
-  getTransportConnection().close();
+  amqpConnection.runLater(() -> {
+ ErrorCondition errorCondition = new ErrorCondition();
+ errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
+ amqpConnection.close(errorCondition);
+ amqpConnection.flush();
+  });
+  amqpConnection.runLater(() -> {
--- End diff --

Probably it is a naive question, but why 2 separate runLater calls instead 
of one?


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2018-12-17 Thread clebertsuconic
GitHub user clebertsuconic opened a pull request:

https://github.com/apache/activemq-artemis/pull/2467

ARTEMIS-2205 Performance improvements on AMQP and other parts



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/clebertsuconic/activemq-artemis amqp-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2467.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2467


commit 3199e32d2c98e7cdfd9ef157b9ea0cb19ca85e75
Author: Clebert Suconic 
Date:   2018-12-17T14:11:54Z

ARTEMIS-2205 Refactor AMQP Processing into Single Threaded (per connection)

https://issues.apache.org/jira/browse/ARTEMIS-2205

commit a72c73fad3b8b4ca03737c6733d018dbd385bfbd
Author: Clebert Suconic 
Date:   2018-12-17T14:12:07Z

ARTEMIS-2205 Broker Improvement by caching Routing for most common cases

During AMQP Perf tests this became more relevant on profiling.
It is a general improvement done as part of this AMQP performance task.

commit 6d93d0aff908c23e68a135041d663c3c39701c72
Author: Clebert Suconic 
Date:   2018-12-17T14:12:14Z

ARTEMIS-2205 Avoid new Runnable for every message sent

As we now use the netty executor, creating a new Runnable
for every message received became a relevant CPU, memory and putting extra 
GC pressure.

By doing this change alone I was able to improve performance by 5% more or 
less.

https://issues.apache.org/jira/browse/ARTEMIS-2205

commit c11605e00737f570ee4eddc62d9f2bacebcdfb2b
Author: Francesco Nigro 
Date:   2018-12-17T14:12:19Z

ARTEMIS-2205 Optimizing some Lambda usages

https://issues.apache.org/jira/browse/ARTEMIS-2205




---