[GitHub] [activemq-artemis] brusdev commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match

2019-09-25 Thread GitBox
brusdev commented on a change in pull request #2851: ARTEMIS-2503 Improve 
wildcards for the roles access match
URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328431483
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java
 ##
 @@ -25,12 +25,22 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import 
org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+
 public class JMXAccessControlList {
 
private Access defaultAccess = new Access("*");
-   private Map domainAccess = new HashMap<>();
+   private HierarchicalRepository domainAccess;
private ConcurrentHashMap> whitelist = new 
ConcurrentHashMap<>();
 
+   public JMXAccessControlList() {
+  WildcardConfiguration domainAccessWildcardConfiguration = new 
WildcardConfiguration();
 
 Review comment:
   The previous implementation of the wildcard for the key attribute 
(documented here 
https://github.com/apache/activemq-artemis/blob/master/docs/user-manual/en/management.md)
 isn't compatible with broker custom wildcard config defined in broker.xml. If 
wildcard configuration uses the same as broker.xml it could break the 
compatibility with the previous implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
wy96f commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate 
journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#issuecomment-535323295
 
 
   @franz1981 I find new problem with -Dio.netty.file.region=false. I generated 
48GB files with load generator. In the case of -Dio.netty.file.region=true and 
master, log(something like this `2019-09-26 11:02:49,348 DEBUG 
[org.apache.activemq.artemis.core.replication.ReplicationManager] sending 
1048576 bytes on file `) showed it took about 7 minutes to transfer files, 
then synchronization done message sent. However in the case of 
-Dio.netty.file.region=false, log showed it took about about 40 seconds to 
transfer files, then sync done message sent. The fact is flow control didn't 
work and it took 40 seconds to build up PendingWrite in the queue rather than 
to transfer files. This leads to sync done message timeouts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2845: ARTEMIS-2336 
Use zero copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328410083
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import io.netty.handler.stream.ChunkedInput;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+
+/**
+ * A {@link ChunkedInput} that fetches data from a file chunk by chunk using
+ * NIO {@link FileChannel}.
+ * 
+ * If your operating system supports
+ * http://en.wikipedia.org/wiki/Zero-copy;>zero-copy file transfer
+ * such as {@code sendfile()}, you might want to use {@link FileRegion} 
instead.
+ */
+class AbsoluteChunkedNioFile implements ChunkedInput {
 
 Review comment:
   I see your upstream fix to netty was merged. As i understand this is a 
shadow of that as you put in pr comment. As thats merged, and they have fairly 
frequent release. Lets wait for them to release, and then remove this class 
before merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 
Improve wildcards for the roles access match
URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328408327
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java
 ##
 @@ -25,12 +25,22 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import 
org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+
 public class JMXAccessControlList {
 
private Access defaultAccess = new Access("*");
-   private Map domainAccess = new HashMap<>();
+   private HierarchicalRepository domainAccess;
private ConcurrentHashMap> whitelist = new 
ConcurrentHashMap<>();
 
+   public JMXAccessControlList() {
+  WildcardConfiguration domainAccessWildcardConfiguration = new 
WildcardConfiguration();
 
 Review comment:
   Wildcard configuration should use the same as broker.xml e.g. if on broker 
custom wildcard config, then the same customisation should be supported here. 
E.g someone may use different separators in broker.xml for example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 
Improve wildcards for the roles access match
URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328408327
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java
 ##
 @@ -25,12 +25,22 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import 
org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+
 public class JMXAccessControlList {
 
private Access defaultAccess = new Access("*");
-   private Map domainAccess = new HashMap<>();
+   private HierarchicalRepository domainAccess;
private ConcurrentHashMap> whitelist = new 
ConcurrentHashMap<>();
 
+   public JMXAccessControlList() {
+  WildcardConfiguration domainAccessWildcardConfiguration = new 
WildcardConfiguration();
 
 Review comment:
   Wildcard configuration should use the same as broker.xml e.g. if on broker 
custom wildcard config, then the same customisation should be supported here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 
Improve wildcards for the roles access match
URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328408327
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java
 ##
 @@ -25,12 +25,22 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import 
org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+
 public class JMXAccessControlList {
 
private Access defaultAccess = new Access("*");
-   private Map domainAccess = new HashMap<>();
+   private HierarchicalRepository domainAccess;
private ConcurrentHashMap> whitelist = new 
ConcurrentHashMap<>();
 
+   public JMXAccessControlList() {
+  WildcardConfiguration domainAccessWildcardConfiguration = new 
WildcardConfiguration();
 
 Review comment:
   Wildcard configuration should use the same as broker.xml


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting

2019-09-25 Thread GitBox
michaelandrepearce commented on issue #2852: ARTEMIS-2505: Fix wiring of the 
max-size-bytes-reject-threshold address-setting
URL: https://github.com/apache/activemq-artemis/pull/2852#issuecomment-535296921
 
 
   Can you squash the commits


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328407373
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
+   }
+
+   private void removeRetroactiveResources(SimpleString address) throws 
Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+
+  SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT);
 
 Review comment:
   The more i think on this the more strongly i think we shouldnt have the 
divert and separate address, simply should be queue under the existing address. 
Both performance wise and also from adaptability for being more configurable in 
future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328407373
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
+   }
+
+   private void removeRetroactiveResources(SimpleString address) throws 
Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+
+  SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT);
 
 Review comment:
   The more i think on this the more strongly i think we shouldnt have the 
divert and separate address, simply should be queue under the existing address. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement 
retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535292298
 
 
   This feature would be more powerful if support for a timebased or bytes size 
retention, e.g. retro active for last hour etc. 
   
   I realise that this may not be possible on this initial feature pr but this 
should impact the way we think about how we design the configuration so its 
most flexible for future expansion once the abilty for a queue to be 
constrained (ring or ttl) by byte size or time.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive 
addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535292298
 
 
   This feature would be more powerful if support for a timebased or bytes size 
retention, e.g. retro active for last hour etc. 
   
   I realise that this may not be possible on this initial feature pr but this 
should impact the way we think about how we design the configuration so its 
most flexible for future expansion once the abilty for a queue to be ringqueue 
by byte size or time.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328403475
 
 

 ##
 File path: docs/user-manual/en/retroactive-addresses.md
 ##
 @@ -0,0 +1,73 @@
+# Retroactive Addresses
+
+A "retroactive" address is an address that will preserve messages sent to it
+for queues which will be created on it in the future. This can be useful in,
+for example, publish-subscribe use cases where clients want to receive the
+messages sent to the address *before* they they actually connected and created
+their multicast "subscription" queue. Typically messages sent to an address
+before a queue was created on it would simply be unavailable to those queues,
+but with a retroactive address a fixed number of messages can be preserved by
+the broker and automatically copied into queues subsequently created on the
+address. This works for both anycast and multicast queues.
+
+## Internal Retroactive Resources
+
+To implement this functionality the broker will create 3 internal resources for
+each retroactive address:
+
+1. A non-exclusive [divert](#diverts) to grab the messages from the retroactive
+   address.
+2. An address to receive the messages from the divert.
+3. A [ring queue](#ring-queues) to hold the messages sent to the address by the
+   divert. The general caveats for ring queues still apply here. See [the
+   chapter on ring queues](#ring-queues) for more details.
+
+These resources are important to be aware of as they will show up in the web
+console and other management or metric views. They will be named according to
+the following pattern:
+
+```
+..(divert|address|queue).retro
+```
+
+For example, if an address named `myAddress` had a `retroactive-message-count`
+of 10 then resources with these names would be created by default:
+
+1. A divert on `myAddress` named `$.artemis.internal.myAddress.divert.retro`
+2. An address named `$.artemis.internal.myAddress.address.retro`
+3. A queue on the address from step #2 named
+   `$.artemis.internal.myAddress.queue.retro` with a `ring-size` of 10.
+
+This pattern is important to note as it allows one to configure 
address-settings
+if necessary.
+
+> Note:
+>
+> Changing the broker's `internal-naming-prefix` once these retroactive
+> resources are created will break the retroactive functionality.
+>
+
+## Configuration
+
+To configure an address to be "retroactive" simply configure the
+`retroactive-message-count` `address-setting` to reflect the number of messages
+you want the broker to preserve, e.g.:
+
+
+```xml
+
+   
+  100
+   
+
+```
+
+The value for `retroactive-message-count` can be updated at runtime either via
 
 Review comment:
   This suggests it updates dynamically. It isnt entirely true as will only 
update the default for new addresses, existing addresses where the retro queue 
is created already it doesnt update. This really is acting as a default value, 
not the explicit. If you needed to change at runtime and existing you would 
need to find the internal queue and update it.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328402743
 
 

 ##
 File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
 ##
 @@ -3404,6 +3404,14 @@
   

 
+
+
 
 Review comment:
   Can prefix default. In future or even in this pr it would be good to be able 
to explicitly set/control this at an address level. Thus this would just be a 
default thats then applied if none is set at that level


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328402189
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
 
 Review comment:
   Should message groups be disabled? So theyre not tracked as not needed in 
the retro queue and would avoid issues if people have very large numbers of 
groups.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive 
addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535288242
 
 
   On the retroactive queue, would want a flag to tell us its a retro queue, 
thats then exposed to jmx so that metrics systems can get the tag, and so 
alerting rules on queue depths can ignore these queues automatically based on 
that flag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328401160
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 ##
 @@ -1170,6 +1172,10 @@ public SecuritySettingPlugin run() {
 
addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
  } else if (DEFAULT_RING_SIZE.equalsIgnoreCase(name)) {
 addressSettings.setDefaultRingSize(XMLUtil.parseLong(child));
+ } else if (RETROACTIVE_MESSAGE_COUNT.equalsIgnoreCase(name)) {
+long retroactiveMessageCount = XMLUtil.parseLong(child);
+Validators.GE_ZERO.validate(DEFAULT_ADDRESS_ROUTING_TYPE, 
retroactiveMessageCount);
 
 Review comment:
   Default address routing type in the validator for msg count?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400809
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
 
 Review comment:
   It may mean a retro anycast and a retro multicast queue is required to 
preserve that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400809
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
 
 Review comment:
   It may mean in a retro anycast and a retro multicast queue is required to 
preserve that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400519
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
 
 Review comment:
   I dont think we should be stripping. We need to ensure anycast messages only 
end up in anycast queues and like wise multicast. When theyre consumed by a 
retroactive consumer


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400519
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
 
 Review comment:
   I dont think we should be stripping. We need to ensure anycast messages only 
end up in anycast queues and like wise multicast. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328399892
 
 

 ##
 File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A simple test-case used for documentation purposes.
+ */
+public class RetroactiveAddressTest extends ActiveMQTestBase {
+
+   protected ActiveMQServer server;
+
+   protected ClientSession session;
+
+   protected ClientSessionFactory sf;
+
+   protected ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  server = createServer(true, createDefaultInVMConfig());
+  server.getConfiguration().setThreadPoolMaxSize(200);
+  server.start();
+  locator = createInVMNonHALocator();
+  sf = createSessionFactory(locator);
+  session = addClientSession(sf.createSession(false, true, true));
+   }
+
+   @Test
+   public void testRetroactiveResourceCreation() throws Exception {
+  final SimpleString addressName = 
SimpleString.toSimpleString("myAddress");
+  final SimpleString divertAddress = 
ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(),
 addressName, ResourceNames.ADDRESS);
+  final SimpleString divertQueue = 
ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(),
 addressName, ResourceNames.QUEUE);
+  final SimpleString divert = 
ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(),
 addressName, ResourceNames.DIVERT);
+  server.getAddressSettingsRepository().addMatch(addressName.toString(), 
new AddressSettings().setRetroactiveMessageCount(10));
+  server.addAddressInfo(new AddressInfo(addressName));
+  assertNotNull(server.getAddressInfo(divertAddress));
+  assertNotNull(server.locateQueue(divertQueue));
+  assertNotNull(server.getPostOffice().getBinding(divert));
+   }
+
+   @Test
+   public void testRetroactiveResourceRemoval() throws Exception {
+  final SimpleString addressName = 
SimpleString.toSimpleString("myAddress");
+  final SimpleString divertAddress = 
ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(),
 addressName, ResourceNames.ADDRESS);
+  final SimpleString divertQueue = 
ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(),
 addressName, ResourceNames.QUEUE);
+  final SimpleString divert = 
ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(),
 addressName, ResourceNames.DIVERT);
+  

[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328398850
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
+   }
+
+   private void removeRetroactiveResources(SimpleString address) throws 
Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+
+  SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT);
 
 Review comment:
   Why divert and separate address is needed. Why not have the retro queue 
under the original address?
   With having separate address will mean message has to be copied rather than 
it simply being a message reference to existing message.
   
   Also adds more risk in what if divert or other address dont clean up cleanly 
etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 
implement retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328398850
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 ##
 @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo 
addressInfo, boolean reload) thr
   }
}
 
+   private void registerRepositoryListenerForRetroactiveAddress(SimpleString 
name) {
+  HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
+ String prefix = server.getConfiguration().getInternalNamingPrefix();
+ String address = 
ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), 
ResourceNames.ADDRESS);
+ AddressSettings settings = 
addressSettingsRepository.getMatch(address);
+ Queue internalQueue = 
server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, 
SimpleString.toSimpleString(address), ResourceNames.QUEUE));
+ if (internalQueue != null && internalQueue.getRingSize() != 
settings.getRetroactiveMessageCount()) {
+internalQueue.setRingSize(settings.getRetroactiveMessageCount());
+ }
+  };
+  addressSettingsRepository.registerListener(repositoryChangeListener);
+  
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
+   }
+
+   private void createRetroactiveResources(final SimpleString 
retroactiveAddressName, final long retroactiveMessageCount, final boolean 
reload) throws Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+  final SimpleString internalAddressName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.ADDRESS);
+  final SimpleString internalQueueName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.QUEUE);
+  final SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, 
ResourceNames.DIVERT);
+
+  if (!reload) {
+ AddressInfo addressInfo = new 
AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true);
+ addAddressInfo(addressInfo);
+ server.createQueue(internalAddressName,
+RoutingType.ANYCAST,
+internalQueueName,
+null,
+null,
+true,
+false,
+false,
+false,
+false,
+0,
+false,
+false,
+false,
+0,
+null,
+false,
+null,
+false,
+0,
+0L,
+false,
+0L,
+0L,
+false,
+retroactiveMessageCount);
+  }
+  server.deployDivert(new DivertConfiguration()
+ .setName(internalDivertName.toString())
+ .setAddress(retroactiveAddressName.toString())
+ .setExclusive(false)
+ 
.setForwardingAddress(internalAddressName.toString())
+ 
.setRoutingType(ComponentConfigurationRoutingType.STRIP));
+   }
+
+   private void removeRetroactiveResources(SimpleString address) throws 
Exception {
+  String prefix = server.getConfiguration().getInternalNamingPrefix();
+
+  SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT);
 
 Review comment:
   Why divert and separate address is needed. Why not have the retro queue 
under the original address?
   With having separate address will mean message has to be copied rather than 
it simply being a message reference to existing message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement 
retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144
 
 
   A few bits as havent fully explored the feature just yet.
   
   How does someone control if the retroactive address will behave when full, 
e.g. page, block, drop?
   
   How or where can someone configure the naming of a retroactive address so 
they can control their names to avoid conflict or for security settings.
   
   How can we configure this setting differently per address.
   
   For a specific address it would be good to be able to configure the 
retroactive queue fully, eg at the address level define a queue (like we can 
others) and then specify the queue to use. E.g. for cases where retroactive you 
may want LVQ behaviour or a filter
   
   For queues created for retroactive, if there is a very high number of 
message groups and someone is using buckets so that the group map doesnt 
explode in size, how is this being avoided in the retroactive queue?
   
   Is routing type being preserved. E.g. if a new multicast queue only messages 
sent to the address as multicast get delivered to it. And like wise anycast. 
Otherwise will cause issues. As normally any anycast message wouldnt have been 
delivered to multicast queue. But now it could possibly, if its not honoured. 
   
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement 
retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144
 
 
   A few bits as havent fully explored the feature just yet.
   
   How does someone control if the retroactive address will behave when full, 
e.g. page, block, drop?
   
   How or where can someone configure the naming of a retroactive address so 
they can control their names to avoid conflict or for security settings.
   
   How can we configure this setting differently per address.
   
   For a specific address it would be good to be able to configure the 
retroactive queue fully, eg at the address level define a queue (like we can 
others) and then specify the queue to use. E.g. for cases where retroactive you 
may want LVQ behaviour or a filter
   
   For queues created for retroactive, if there is a very high number of 
message groups and someone is using buckets so that the group map doesnt 
explode in size, how is this being avoided in the retroactive queue?
   
   Is routing type being preserved. E.g. if a new multicast queue only messages 
sent to the address as multicast get delivered to it. And like wise anycast. 
Otherwise will cause issues. As normally any anycast message wouldnt have been 
delivered to multicast queur. But now it could possibly, if its not honoured. 
   
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement 
retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144
 
 
   A few bits as havent fully explored the feature just yet.
   
   How does someone control if the retroactive address will behave when full, 
e.g. page, block, drop?
   
   How or where can someone configure the naming of a retroactive address so 
they can control their names to avoid conflict or for security settings.
   
   How can we configure this setting differently per address.
   
   For a specific address it would be good to be able to configure the 
retroactive queue fully, eg at the address level define a queue (like we can 
others) and then specify the queue to use. E.g. for cases where retroactive you 
may want LVQ behaviour or a filter
   
   For queues created for retroactive, if there is a very high number of 
message groups and someone is using buckets so that the group map doesnt 
explode in size, how is this being avoided in the retroactive queue?
   
   Is routing type being preserved. E.g. if a new multicast queue only messages 
sent to the address as multicast get delivered to it. And like wise anycast.
   
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement 
retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144
 
 
   A few bits as havent fully explored the feature just yet.
   
   How does someone control if the retroactive address will behave when full, 
e.g. page, block, drop?
   
   How or where can someone configure the naming of a retroactive address so 
they can control their names to avoid conflict or for security settings.
   
   How can we configure this setting differently per address.
   
   For a specific address it would be good to be able to configure the 
retroactive queue fully, eg at the address level define a queue (like we can 
others) and then specify the queue to use. E.g. for cases where retroactive you 
may want LVQ behaviour or a filter
   
   For queues created for retroactive, if there is a very high number of 
message groups and someone is using buckets so that the group map doesnt 
explode in size, how is this being avoided in the retroactive queue?
   
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive 
addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144
 
 
   A few bits as havent fully explored the feature just yet.
   
   How does someone control if the retroactive address will behave when full, 
e.g. page, block, drop?
   
   How or where can someone configure the naming of a retroactive address so 
they can control their names to avoid conflict or for security settings.
   
   For a specific address it would be good to be able to configure the 
retroactive queue fully, eg at the address level define a queue (like we can 
others) and then specify the queue to use. E.g. for cases where retroactive you 
may want LVQ behaviour.
   
   For queues created for retroactive, if there is a very high number of 
message groups and someone is using buckets so that the group map doesnt 
explode in size, how is this being avoided in the retroactive queue?
   
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328133175
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   Still searching for a reproducer to test 
https://github.com/apache/activemq-artemis/pull/2845/commits/9853633d3b4c2aacf37d1b402712b0a9938a827c


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] k-wall commented on issue #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting

2019-09-25 Thread GitBox
k-wall commented on issue #2852: ARTEMIS-2505: Fix wiring of the 
max-size-bytes-reject-threshold address-setting
URL: https://github.com/apache/activemq-artemis/pull/2852#issuecomment-535052799
 
 
   @franz1981 unit test added as requested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] gaohoward opened a new pull request #2853: ARTEMIS-2506 MQTT doesn't cleanup underlying connection for bad clients

2019-09-25 Thread GitBox
gaohoward opened a new pull request #2853: ARTEMIS-2506 MQTT doesn't cleanup 
underlying connection for bad clients
URL: https://github.com/apache/activemq-artemis/pull/2853
 
 
   When a bad MQTT clients drop its connection without proper closing
   it the broker doesn't close the underlying physical connection.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328133175
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   Still searchign for a reproducer to test 
https://github.com/apache/activemq-artemis/pull/2845/commits/9853633d3b4c2aacf37d1b402712b0a9938a827c


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] k-wall opened a new pull request #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting

2019-09-25 Thread GitBox
k-wall opened a new pull request #2852: ARTEMIS-2505: Fix wiring of the 
max-size-bytes-reject-threshold address-setting
URL: https://github.com/apache/activemq-artemis/pull/2852
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328101333
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
> How about not opening raf and new ReplicationSyncFileMessage(content, 
pageStore, id, null, null, offset, toSend) when file size is 0 so we don't take 
care to release it?
   
   I like it, effectively is  wasted effort to open/close it for nothing...
   I wil manage the 2 things in 2 separate commits:
   1) to address the 0 length transferts
   2) to simplify flow control: given that is a concurrent backpressure doesn't 
need to be super-precise, slowing down the system for no reasons


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy 
to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328099516
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   AFAIK 0 bytes file will be created in some cases:
   1. In the beginning of `startReplication`, 0 bytes page file will be 
created. If backup fails and connects to live again, 0 bytes page file will be 
sent.
   2. When all pages are consumed, `PageCursorProviderImpl::cleanupComplete` 
will be called generating 0 bytes page file, and might be sent to backup later.
   
   That works, but we'll add a new send method only used here? How about not 
opening raf and `new ReplicationSyncFileMessage(content, pageStore, id, null, 
null, offset, toSend)` when file size is 0 so we don't take care to release it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate 
journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#issuecomment-534999649
 
 
   @wy96f 
   
   > . So only non file packet accounts for the pending write bytes in channel, 
and flowControl is not working very well in this case, will this have a 
negative impact?
   
   I see that `DefaultMessageSizeEstimator` for `FileRegion` isn't handled nor 
`ChannelOutboundBuffer::decrementPendingOutboundBytes` is called afted 
`sendFile` succeed: it means that our `lazy` backpressure is quite limiting 
while using `FileRegion`s.
   For `ChunkedNioFile` is a different story, but equally interesting: 
`DefaultMessageSizeEstimator` is not able to recognize `ChunkedNioFile` , but 
`ChannelOutboundBuffer::decrementPendingOutboundBytes` is correctly handling it 
because `ChunkedWriteHandler` is transparently handling the files as `ByteBuf`s.
   I think that is time to simplify `blockUntilWritable` to make it more lazy 
ie to just leverage on Netty `isWritable` instead of trying to calculate 
exactly the pending bytes :) 
   I'm adding a commit to address that :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 edited a comment on issue #2845: ARTEMIS-2336 Use zero copy to 
replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#issuecomment-534999649
 
 
   @wy96f 
   
   > . So only non file packet accounts for the pending write bytes in channel, 
and flowControl is not working very well in this case, will this have a 
negative impact?
   
   I see that `DefaultMessageSizeEstimator` for `FileRegion` isn't handled nor 
`ChannelOutboundBuffer::decrementPendingOutboundBytes` is called afted 
`sendFile` succeed: it means that our "lazy" backpressure is quite limiting 
while using `FileRegion`s.
   For `ChunkedNioFile` is a different story, but equally interesting: 
`DefaultMessageSizeEstimator` is not able to recognize `ChunkedNioFile` , but 
`ChannelOutboundBuffer::decrementPendingOutboundBytes` is correctly handling it 
because `ChunkedWriteHandler` is transparently handling the files as `ByteBuf`s.
   I think that is time to simplify `blockUntilWritable` to make it more lazy 
ie to just leverage on Netty `isWritable` instead of trying to calculate 
exactly the pending bytes :) 
   I'm adding a commit to address that :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   I see that :
   ```java
  if (syncFileMessage.getFileId() != -1 && 
syncFileMessage.getDataSize() > 0) {
 replicatingChannel.send(syncFileMessage, 
syncFileMessage.getFileChannel(),
 syncFileMessage.getOffset(), 
syncFileMessage.getDataSize(),
 lastChunk ? (Channel.Callback) 
success -> syncFileMessage.release() : null);
   ```
   So we never send syncFileMessage with file size == 0 and when we will do it 
with
   ```java
 replicatingChannel.send(syncFileMessage);
   ```
   We don't release it when completed. It seems a leak, but is not, because we 
will likely to call it on the previous iteration and it should be already ready 
to be release or released upon completion of the previous call.
   I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* 
it would happen we're not releasing it correctly: we should add 
   ```java
 replicatingChannel.send(syncFileMessage, lastChunk ? 
(Channel.Callback) success -> syncFileMessage.release() : null);
   ```
   It would cause on the normal path (file::size > 0) to have 
`syncFileMessage.release` called twice ATM (we mark lastChunk == true for the 
last 2 sent packets), but `RandomAccessFile::close` should be idempotent 
so...no harm and it would make the zero sized file to work correctly :) wdyt?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   I see that :
   ```java
  if (syncFileMessage.getFileId() != -1 && 
syncFileMessage.getDataSize() > 0) {
 replicatingChannel.send(syncFileMessage, 
syncFileMessage.getFileChannel(),
 syncFileMessage.getOffset(), 
syncFileMessage.getDataSize(),
 lastChunk ? (Channel.Callback) 
success -> syncFileMessage.release() : null);
   ```
   So we never send syncFileMessage with file size == 0 and when we will do it 
with
   ```java
 replicatingChannel.send(syncFileMessage);
   ```
   We don't release it when completed. It seems a leak, but is not, because we 
will likely to call it on the previous iteration and it should be already ready 
to be release or released upon completion of the previous call.
   I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* 
it would happen we're not releasing it correctly: we should add 
   ``java
 replicatingChannel.send(syncFileMessage, lastChunk ? 
(Channel.Callback) success -> syncFileMessage.release() : null);
   ```
   It would cause on the normal path (file::size > 0) to have 
`syncFileMessage.release` called twice ATM (we mark lastChunk == true for the 
last 2 sent packets), but `RandomAccessFile::close` should be idempotent 
so...no harm :) wdyt?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   I see that :
   ```java
  if (syncFileMessage.getFileId() != -1 && 
syncFileMessage.getDataSize() > 0) {
 replicatingChannel.send(syncFileMessage, 
syncFileMessage.getFileChannel(),
 syncFileMessage.getOffset(), 
syncFileMessage.getDataSize(),
 lastChunk ? (Channel.Callback) 
success -> syncFileMessage.release() : null);
   ```
   So we never send syncFileMessage with file size == 0 and when we will do it 
with
   ```java
 replicatingChannel.send(syncFileMessage);
   ```
   We don't release it when completed. It seems a leak, but is not, because we 
will likely to call it on the previous iteration and it should be already ready 
to be release or released upon completion of the previous call.
   I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* 
it would happen we're not releasing it correctly: we should add 
   ```java
 replicatingChannel.send(syncFileMessage, lastChunk ? 
(Channel.Callback) success -> syncFileMessage.release() : null);
   ```
   It would cause on the normal path (file::size > 0) to have 
`syncFileMessage.release` called twice ATM (we mark lastChunk == true for the 
last 2 sent packets), but `RandomAccessFile::close` should be idempotent 
so...no harm :) wdyt?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   I see that :
   ```java
  if (syncFileMessage.getFileId() != -1 && 
syncFileMessage.getDataSize() > 0) {
 replicatingChannel.send(syncFileMessage, 
syncFileMessage.getFileChannel(),
 syncFileMessage.getOffset(), 
syncFileMessage.getDataSize(),
 lastChunk ? (Channel.Callback) 
success -> syncFileMessage.release() : null);
   ```
   So we never send syncFileMessage with file size == 0 and when we will do it 
with
   ```java
 replicatingChannel.send(syncFileMessage);
   ```
   We don't release it. It seems a leak, but is not, because we will likely to 
call it on the previous iteration and it should be already ready to be release 
or released upon completion of the previous call.
   I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* 
it would happen we're not releasing it correctly: we should add 
   ``java
 replicatingChannel.send(syncFileMessage, lastChunk ? 
(Channel.Callback) success -> syncFileMessage.release() : null);
   ```
   It would cause on the normal path (file::size > 0) to have 
`syncFileMessage.release` called twice ATM (we mark lastChunk == true for the 
last 2 sent packets), but `RandomAccessFile::close` should be idempotent 
so...no harm :) wdyt?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328068540
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   Good catch: let me take a look! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy 
to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328056532
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -560,49 +590,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   If we send a 0 size file, we need to close raf as 
ReplicationSyncFileMessage::release will not be called, correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file

2019-09-25 Thread GitBox
wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy 
to replicate journal/page/large message file
URL: https://github.com/apache/activemq-artemis/pull/2666#discussion_r328029179
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -551,49 +581,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   If we send a 0 size file, we need to close `raf` as 
ReplicationSyncFileMessage::release will not be called, correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file

2019-09-25 Thread GitBox
wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy 
to replicate journal/page/large message file
URL: https://github.com/apache/activemq-artemis/pull/2666#discussion_r328029179
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 ##
 @@ -551,49 +581,52 @@ private void 
sendLargeFile(AbstractJournalStorageManager.JournalContent content,
   if (!file.isOpen()) {
  file.open();
   }
-  int size = 32 * 1024;
+  final int size = 1024 * 1024;
+  long fileSize = file.size();
 
   int flowControlSize = 10;
 
   int packetsSent = 0;
   FlushAction action = new FlushAction();
 
+  long offset = 0;
+  RandomAccessFile raf = null;
+  FileChannel fileChannel = null;
   try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
-
-// We can afford having a single buffer here for this entire loop
-// because sendReplicatePacket will encode the packet as a 
NettyBuffer
-// through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
-while (true) {
-   final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-   buffer.clear();
-   ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
-   final int bytesRead = channel.read(byteBuffer);
-   int toSend = bytesRead;
-   if (bytesRead > 0) {
-  if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
-  } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
-  }
-   }
-   logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
-   // sending -1 or 0 bytes will close the file at the backup
-   // We cannot simply send everything of a file through the 
executor,
-   // otherwise we would run out of memory.
-   // so we don't use the executor here
-   sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
-   packetsSent++;
-
-   if (packetsSent % flowControlSize == 0) {
-  flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
 
 Review comment:
   If we send a 0 size file, we need to close `raf` as 
ReplicationSyncFileMessage::release will not be called, correct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] k-wall edited a comment on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer

2019-09-25 Thread GitBox
k-wall edited a comment on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified 
disposition to be used signal address full to a sending peer
URL: https://github.com/apache/activemq-artemis/pull/2847#issuecomment-534963469
 
 
   My initial patch missed the code path where the address full policy is BLOCK 
and the paging store signals itself full.Previously reject would be sent 
unconditionally.  Now Modified is sent if enabled/supported on the link as per 
the other case.
   
   There is a small behavioural change in this patch (a25a289).  Previously on 
the BLOCK code path the Reject for an address-full would be sent with a 
`resource-limit-exceeded `whereas the other code path would send a Reject for 
an address-full with a condition `failed`.  This patch uses 
`resource-limit-exceeded` consistently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] k-wall commented on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer

2019-09-25 Thread GitBox
k-wall commented on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified 
disposition to be used signal address full to a sending peer
URL: https://github.com/apache/activemq-artemis/pull/2847#issuecomment-534963469
 
 
   My initial patch missed the code path where the address full policy is BLOCK 
and the paging store signals itself full.Previously reject would be sent 
unconditionally.  Now Modified is sent if enabled/supported on the link as per 
the other case.
   
   There is a small behavioural change in this patch (a25a289).  Previously on 
the BLOCK code path the Reject for an address-full would be sent with a 
resource-limit-exceeded whereas the other code path would send a Reject for an 
address-full with a condition ""failed".  This patch uses 
resource-limit-exceeded consistently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer

2019-09-25 Thread GitBox
k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow 
Modified disposition to be used signal address full to a sending peer
URL: https://github.com/apache/activemq-artemis/pull/2847#discussion_r328047546
 
 

 ##
 File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 ##
 @@ -326,6 +325,37 @@ private void actualDelivery(Delivery delivery, Receiver 
receiver, ReadableBuffer
   }
}
 
+   private DeliveryState determineDeliveryState(final Source source, final 
boolean useModified, final Exception e) {
+  Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
+  if (e instanceof ActiveMQAddressFullException && useModified &&
+  (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Modified)) {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ modified.setUndeliverableHere(false);
 
 Review comment:
   That should be fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer

2019-09-25 Thread GitBox
k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow 
Modified disposition to be used signal address full to a sending peer
URL: https://github.com/apache/activemq-artemis/pull/2847#discussion_r328047809
 
 

 ##
 File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 ##
 @@ -326,6 +325,37 @@ private void actualDelivery(Delivery delivery, Receiver 
receiver, ReadableBuffer
   }
}
 
+   private DeliveryState determineDeliveryState(final Source source, final 
boolean useModified, final Exception e) {
+  Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
+  if (e instanceof ActiveMQAddressFullException && useModified &&
+  (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Modified)) {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ modified.setUndeliverableHere(false);
+ return modified;
+  } else {
+ if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Rejected) {
+ErrorCondition condition = new ErrorCondition();
+
+// Set condition
+if (e instanceof ActiveMQSecurityException) {
+   condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+} else {
+   condition.setCondition(Symbol.valueOf("failed"));
+}
+condition.setDescription(e.getMessage());
+
+Rejected rejected = new Rejected();
+rejected.setError(condition);
+return rejected;
+ } else if (source.getDefaultOutcome() instanceof DeliveryState) {
+return ((DeliveryState) source.getDefaultOutcome());
+ } else {
+return Accepted.getInstance();
 
 Review comment:
   I've changed the code to send Rejected in the case where neither outcomes 
not default-outcome is specified.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] brusdev commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses

2019-09-25 Thread GitBox
brusdev commented on a change in pull request #2850: ARTEMIS-2504 implement 
retroactive addresses
URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328041981
 
 

 ##
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ##
 @@ -1971,7 +1988,9 @@ private synchronized int iterQueue(final int flushLimit,
 
 if (filter1 == null || filter1.match(ref.getMessage())) {
messageAction.actMessage(tx, ref);
-   iter.remove();
+   if (remove) {
 
 Review comment:
   Do we need the same check for scheduledDelivery messages 
(https://github.com/apache/activemq-artemis/blob/master/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java#L128)
 and paged messages 
(https://github.com/apache/activemq-artemis/blob/master/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java#L2002)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328011491
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   I'm finishing running the tests, but I believe you can try to run your load 
generator as well now (fingers crossed!)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328004191
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   We were missing this 
https://github.com/apache/activemq-artemis/pull/2845/commits/438c0fb0c69596cbd02f5ec6337c934b331aea96#diff-917ee65648802d0c63faa660eb9f8debR68
   to propagate a writeable channel and resume sending


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-nms-amqp] michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support

2019-09-25 Thread GitBox
michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add 
TCP Keep-Alive support
URL: https://github.com/apache/activemq-nms-amqp/pull/38#discussion_r328002168
 
 

 ##
 File path: src/NMS.AMQP/Transport/SecureTransportContext.cs
 ##
 @@ -37,8 +37,8 @@ namespace Apache.NMS.AMQP.Transport
 internal class SecureTransportContext : TransportContext, 
ISecureTransportContext
 {
 
-private readonly static List SupportedProtocols;
-private readonly static Dictionary 
SupportedProtocolValues;
+private static readonly List SupportedProtocols;
 
 Review comment:
   ah i didnt spot that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy 
to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327998358
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   > I've proceed in the investigation and I see that we are failing here: 
https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320
   > 
   > And we won't get notified anymore on 
[ChunkedWriteHandler::channelWritabilityChanged](https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L144)
   > 
   > It means to me that:
   > 
   > 1. The receiver side has stopped reading data ie effectively the channel 
is not writeable anymore
   > 2. The sender side (on Netty) has some bug while marking progress and/or 
propagate writeability changes
   
   
https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320
 would not be called as `doFlush` is running in netty executor and channel is 
still not writable after writing message.
   Should not be 1 bcs `netstat` showed no backlog. I found 
`channelWritabilityChanged` was never called during replication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327997358
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   I've found the issue: was on our side. I'm going to update this PR with the 
change ;)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-nms-amqp] HavretGC commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support

2019-09-25 Thread GitBox
HavretGC commented on a change in pull request #38: AMQNET-616: Add TCP 
Keep-Alive support
URL: https://github.com/apache/activemq-nms-amqp/pull/38#discussion_r327987510
 
 

 ##
 File path: src/NMS.AMQP/Transport/SecureTransportContext.cs
 ##
 @@ -37,8 +37,8 @@ namespace Apache.NMS.AMQP.Transport
 internal class SecureTransportContext : TransportContext, 
ISecureTransportContext
 {
 
-private readonly static List SupportedProtocols;
-private readonly static Dictionary 
SupportedProtocolValues;
+private static readonly List SupportedProtocols;
 
 Review comment:
   It was static from the beginning. I've merely rearranged words order to 
adjust it to current code style. From what I see from the code it's static as 
it never changes. Some consts that are initialized in static constructor. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327984141
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   I've proceed in the investigation and I see that we are failing here: 
https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320
   
   And we won't get notified anymore on 
[ChunkedWriteHandler::channelWritabilityChanged](https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L144)
   
   It means to me that:
   
   1. The receiver side has stopped reading data ie effectively the channel is 
not writeable anymore
   2. The sender side (on Netty) has some bug while marking progress and/or 
propagate writeability changes
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327984141
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   I've proceed in the investigation and I see that we are failing here: 
https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320
   
   And we won't get notified anymore on 
[ChunkedWriteHandler::channelWritabilityChanged](https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L144)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer

2019-09-25 Thread GitBox
gemmellr commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] 
Allow Modified disposition to be used signal address full to a sending peer
URL: https://github.com/apache/activemq-artemis/pull/2847#discussion_r327978612
 
 

 ##
 File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 ##
 @@ -326,6 +325,37 @@ private void actualDelivery(Delivery delivery, Receiver 
receiver, ReadableBuffer
   }
}
 
+   private DeliveryState determineDeliveryState(final Source source, final 
boolean useModified, final Exception e) {
+  Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
+  if (e instanceof ActiveMQAddressFullException && useModified &&
+  (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Modified)) {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ modified.setUndeliverableHere(false);
 
 Review comment:
   You aren't missing anything, I've somehow completely misread that the second 
time round, thinking it was being set true...no idea how, read start of one 
line and end of other? :)
   
   I probably wouldn't bother setting it, but don't see harm in doing so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-nms-amqp] michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support

2019-09-25 Thread GitBox
michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add 
TCP Keep-Alive support
URL: https://github.com/apache/activemq-nms-amqp/pull/38#discussion_r327971083
 
 

 ##
 File path: src/NMS.AMQP/Transport/SecureTransportContext.cs
 ##
 @@ -37,8 +37,8 @@ namespace Apache.NMS.AMQP.Transport
 internal class SecureTransportContext : TransportContext, 
ISecureTransportContext
 {
 
-private readonly static List SupportedProtocols;
-private readonly static Dictionary 
SupportedProtocolValues;
+private static readonly List SupportedProtocols;
 
 Review comment:
   why change to static? isnt that dangerous as what occurs if multiple CF's or 
contexts needing different values


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero 
copy to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327950181
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   I see, but I've tried to raise the highwatermark value, the outbound TCP 
buffer and lower the size of chunks to be 32 K getting the same error...
   I will try to override the back-pressure mechanism we use on NettyConnection 
making it to return just true...
   If we will found an error on Netty side I will send a pr there instead


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2844: ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers

2019-09-25 Thread GitBox
franz1981 commented on a change in pull request #2844: ARTEMIS-1811 NIO Seq 
File should use RandomAccessFile with heap buffers
URL: https://github.com/apache/activemq-artemis/pull/2844#discussion_r327938663
 
 

 ##
 File path: 
artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
 ##
 @@ -160,14 +173,63 @@ public int read(final ByteBuffer bytes) throws Exception 
{
   return read(bytes, null);
}
 
+   //This value has been tuned just to reduce the memory footprint
+   //of read/write of the whole file size: given that this value
+   //is > 8192, RandomAccessFile JNI code will use malloc/free instead
+   //of using a copy on the stack, but it has been proven to NOT be
+   //a bottleneck
+   private static final int BUF_SIZE = 2 * 1024 * 1024;
+
+   private static int readRaf(RandomAccessFile raf, byte[] b, int off, int 
len, int ioSize) throws IOException {
+  int remaining = len;
+  int offset = off;
+  while (remaining > 0) {
+ final int chunkSize = Math.min(ioSize, remaining);
+ final int read = raf.read(b, offset, chunkSize);
+ assert read != 0;
+ if (read == -1) {
+if (len == remaining) {
+   return -1;
+}
+break;
+ }
+ offset += read;
+ remaining -= read;
+  }
+  return len - remaining;
+   }
+
+   private static void writeRaf(RandomAccessFile raf, byte[] b, int off, int 
len, int ioSize) throws IOException {
+  int remaining = len;
+  int offset = off;
+  while (remaining > 0) {
+ final int chunkSize = Math.min(ioSize, remaining);
+ raf.write(b, offset, chunkSize);
+ offset += chunkSize;
+ remaining -= chunkSize;
+  }
+   }
+
@Override
public synchronized int read(final ByteBuffer bytes,
 final IOCallback callback) throws IOException, 
ActiveMQIllegalStateException {
   try {
  if (channel == null) {
 throw new ActiveMQIllegalStateException("File " + 
this.getFileName() + " has a null channel");
  }
- int bytesRead = channel.read(bytes);
+ final int bytesRead;
+ if (bytes.hasArray()) {
+if (bytes.remaining() > BUF_SIZE) {
+   bytesRead = readRaf(rfile, bytes.array(), bytes.arrayOffset() + 
bytes.position(), bytes.remaining(), BUF_SIZE);
+} else {
+   bytesRead = rfile.read(bytes.array(), bytes.arrayOffset() + 
bytes.position(), bytes.remaining());
 
 Review comment:
   to save inlining budget: for most of the cases where the first check will 
save the call on the happy path


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)

2019-09-25 Thread GitBox
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy 
to replicate journal/page/large message file (AGAIN)
URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327948683
 
 

 ##
 File path: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ##
 @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) {
   return canWrite;
}
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-  if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+   private Object getFileObject(FileChannel fileChannel, long offset, int 
dataSize) {
+  if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) 
{
 
 Review comment:
   > So it seems that Netty is slowing sending them...can see 2 reasons for 
that:
   > 
   > * we are sending too much for real
   > * the receiving broker is not reading them fast enough
   > 
   > I cannot find other reasons, other insights?
   
   Actually broker was not sending built up data in the queue after some point 
until timeout failure. It seems watermark mechanism triggered some bug 
resulting in not resuming transfer in `ChunkedWriteHandler`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services