[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602226
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in 
chunks.
+ * It's safe to be used by many threads concurrently and has a max 
capacity of {@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList {
--- End diff --

Thanks to you guys and one day I will be better in naming things too I 
swear :D


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602031
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

Is not that easy here, this one is not a common collection (in theory, but 
we can get an agreement on the contract for sure): it is more an append only 
list that allows indexed queries like a map.
If I remember correctly it is similar to an [hashed array 
tree](https://en.wikipedia.org/wiki/Hashed_array_tree), where the top-level 
directory is a double linked list of "folders" (instead of an array, like the 
original implementation): indeed in the code there are chunkIndex (==key of top 
level directory) and offset (==key of leaf into a directory).


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601217
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in 
chunks.
+ * It's safe to be used by many threads concurrently and has a max 
capacity of {@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList {
--- End diff --

This is a lot better!


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601169
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r24662
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

@clebertsuconic @michaelandrepearce Good points guys: i have re-implemented 
the logic in a proper collection (but that won't implement canonical 
`Collection` types, because it is not canonical at all)


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --

eheh you're right :P, but the code of this collection was coming from a 
version where `chunkSize` was not a static final constant: in the new version 
is more clear why I have done it


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

if this was a collection, this would almost be throwing illegal 
arguement.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --

cough ;) ... for the same comment you left me  :P


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570552
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition 
pos) {
   return info;
}
 
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+  installTXCallback(tx, position, -1);
+   }
+
/**
 * @param tx
 * @param position
+* @param persistentSize if negative it needs to be calculated on the 
fly
 */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
   if (position.getRecordID() >= 0) {
  // It needs to persist, otherwise the cursor will return to the 
fist page position
  tx.setContainsPersistent();
   }
 
   PageCursorInfo info = getPageInfo(position);
   PageCache cache = info.getCache();
-  long size = 0;
   if (cache != null) {
- size = 
getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --

-1 is used as a reserved value in another point to trigger the cache lookup


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition 
pos) {
   return info;
}
 
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+  installTXCallback(tx, position, -1);
+   }
+
/**
 * @param tx
 * @param position
+* @param persistentSize if negative it needs to be calculated on the 
fly
 */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
   if (position.getRecordID() >= 0) {
  // It needs to persist, otherwise the cursor will return to the 
fist page position
  tx.setContainsPersistent();
   }
 
   PageCursorInfo info = getPageInfo(position);
   PageCache cache = info.getCache();
-  long size = 0;
   if (cache != null) {
- size = 
getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --

surely this is checking for something like if its -1? not just that its 
negative which would be worrying if so this should be more explicit to just 
be checking -1, and if anything else thats negative, means illegal argument


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569626
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

I know but in the original version it was handled in that way and it covers 
2 case: the collection is null and the collection hasn't enough element


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

surely if this occurs, theres some issue..


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
--- End diff --

naming of fields is missaligned, some places its called size, some others 
you call it index, this is hard to follow.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568552
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568367
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567905
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

Makes sense :+1: 


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

@franz1981 Id agree with @clebertsuconic here, a bit like what ive done 
with priority consumers, i ended up splitting out the collections logic, which 
has ended up making it cleaner, and easier to reason with. (and as youve marked 
out on its pr, more testable ;) )


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246558406
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

Isn't this mixing the collection implementation itself into the 
LivePageCache?

isn't there a way to implement this logic into its own structure? Like 
PageCache using a generic ChunkArray (a name I just came up here)?

I'm a bit concerned on maintaining the business side of this issue (that is 
the PageCache) with the speedy implementation of a collection.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
GitHub user franz1981 opened a pull request:

https://github.com/apache/activemq-artemis/pull/2494

ARTEMIS-2224 Reduce contention on LivePageCacheImpl

It includes:

- **lock-free LivePageCache + tests**:
LivePageCacheImpl has been reimplemented to be
lock-free, multi-producer and multi-consumer
in any of its operations.
- **Avoid unnecessary page cache queries on ack TX**:
PageSubscriptionImpl::ackTx is already performing a counter update
using the message persistent size: the size can be reused on
PagePosition::setPersistentSize, avoiding to query the page cache just
to compute it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/franz1981/activemq-artemis lock-free-paging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2494






---