[GitHub] [activemq-artemis] brusdev commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match
brusdev commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328431483 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java ## @@ -25,12 +25,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; + public class JMXAccessControlList { private Access defaultAccess = new Access("*"); - private Map domainAccess = new HashMap<>(); + private HierarchicalRepository domainAccess; private ConcurrentHashMap> whitelist = new ConcurrentHashMap<>(); + public JMXAccessControlList() { + WildcardConfiguration domainAccessWildcardConfiguration = new WildcardConfiguration(); Review comment: The previous implementation of the wildcard for the key attribute (documented here https://github.com/apache/activemq-artemis/blob/master/docs/user-manual/en/management.md) isn't compatible with broker custom wildcard config defined in broker.xml. If wildcard configuration uses the same as broker.xml it could break the compatibility with the previous implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
wy96f commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#issuecomment-535323295 @franz1981 I find new problem with -Dio.netty.file.region=false. I generated 48GB files with load generator. In the case of -Dio.netty.file.region=true and master, log(something like this `2019-09-26 11:02:49,348 DEBUG [org.apache.activemq.artemis.core.replication.ReplicationManager] sending 1048576 bytes on file `) showed it took about 7 minutes to transfer files, then synchronization done message sent. However in the case of -Dio.netty.file.region=false, log showed it took about about 40 seconds to transfer files, then sync done message sent. The fact is flow control didn't work and it took 40 seconds to build up PendingWrite in the queue rather than to transfer files. This leads to sync done message timeouts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
michaelandrepearce commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328410083 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import io.netty.handler.stream.ChunkedInput; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; + +/** + * A {@link ChunkedInput} that fetches data from a file chunk by chunk using + * NIO {@link FileChannel}. + * + * If your operating system supports + * http://en.wikipedia.org/wiki/Zero-copy;>zero-copy file transfer + * such as {@code sendfile()}, you might want to use {@link FileRegion} instead. + */ +class AbsoluteChunkedNioFile implements ChunkedInput { Review comment: I see your upstream fix to netty was merged. As i understand this is a shadow of that as you put in pr comment. As thats merged, and they have fairly frequent release. Lets wait for them to release, and then remove this class before merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match
michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328408327 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java ## @@ -25,12 +25,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; + public class JMXAccessControlList { private Access defaultAccess = new Access("*"); - private Map domainAccess = new HashMap<>(); + private HierarchicalRepository domainAccess; private ConcurrentHashMap> whitelist = new ConcurrentHashMap<>(); + public JMXAccessControlList() { + WildcardConfiguration domainAccessWildcardConfiguration = new WildcardConfiguration(); Review comment: Wildcard configuration should use the same as broker.xml e.g. if on broker custom wildcard config, then the same customisation should be supported here. E.g someone may use different separators in broker.xml for example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match
michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328408327 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java ## @@ -25,12 +25,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; + public class JMXAccessControlList { private Access defaultAccess = new Access("*"); - private Map domainAccess = new HashMap<>(); + private HierarchicalRepository domainAccess; private ConcurrentHashMap> whitelist = new ConcurrentHashMap<>(); + public JMXAccessControlList() { + WildcardConfiguration domainAccessWildcardConfiguration = new WildcardConfiguration(); Review comment: Wildcard configuration should use the same as broker.xml e.g. if on broker custom wildcard config, then the same customisation should be supported here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match
michaelandrepearce commented on a change in pull request #2851: ARTEMIS-2503 Improve wildcards for the roles access match URL: https://github.com/apache/activemq-artemis/pull/2851#discussion_r328408327 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/JMXAccessControlList.java ## @@ -25,12 +25,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; + public class JMXAccessControlList { private Access defaultAccess = new Access("*"); - private Map domainAccess = new HashMap<>(); + private HierarchicalRepository domainAccess; private ConcurrentHashMap> whitelist = new ConcurrentHashMap<>(); + public JMXAccessControlList() { + WildcardConfiguration domainAccessWildcardConfiguration = new WildcardConfiguration(); Review comment: Wildcard configuration should use the same as broker.xml This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting
michaelandrepearce commented on issue #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting URL: https://github.com/apache/activemq-artemis/pull/2852#issuecomment-535296921 Can you squash the commits This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328407373 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); + } + + private void removeRetroactiveResources(SimpleString address) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + + SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT); Review comment: The more i think on this the more strongly i think we shouldnt have the divert and separate address, simply should be queue under the existing address. Both performance wise and also from adaptability for being more configurable in future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328407373 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); + } + + private void removeRetroactiveResources(SimpleString address) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + + SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT); Review comment: The more i think on this the more strongly i think we shouldnt have the divert and separate address, simply should be queue under the existing address. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535292298 This feature would be more powerful if support for a timebased or bytes size retention, e.g. retro active for last hour etc. I realise that this may not be possible on this initial feature pr but this should impact the way we think about how we design the configuration so its most flexible for future expansion once the abilty for a queue to be constrained (ring or ttl) by byte size or time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535292298 This feature would be more powerful if support for a timebased or bytes size retention, e.g. retro active for last hour etc. I realise that this may not be possible on this initial feature pr but this should impact the way we think about how we design the configuration so its most flexible for future expansion once the abilty for a queue to be ringqueue by byte size or time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328403475 ## File path: docs/user-manual/en/retroactive-addresses.md ## @@ -0,0 +1,73 @@ +# Retroactive Addresses + +A "retroactive" address is an address that will preserve messages sent to it +for queues which will be created on it in the future. This can be useful in, +for example, publish-subscribe use cases where clients want to receive the +messages sent to the address *before* they they actually connected and created +their multicast "subscription" queue. Typically messages sent to an address +before a queue was created on it would simply be unavailable to those queues, +but with a retroactive address a fixed number of messages can be preserved by +the broker and automatically copied into queues subsequently created on the +address. This works for both anycast and multicast queues. + +## Internal Retroactive Resources + +To implement this functionality the broker will create 3 internal resources for +each retroactive address: + +1. A non-exclusive [divert](#diverts) to grab the messages from the retroactive + address. +2. An address to receive the messages from the divert. +3. A [ring queue](#ring-queues) to hold the messages sent to the address by the + divert. The general caveats for ring queues still apply here. See [the + chapter on ring queues](#ring-queues) for more details. + +These resources are important to be aware of as they will show up in the web +console and other management or metric views. They will be named according to +the following pattern: + +``` +..(divert|address|queue).retro +``` + +For example, if an address named `myAddress` had a `retroactive-message-count` +of 10 then resources with these names would be created by default: + +1. A divert on `myAddress` named `$.artemis.internal.myAddress.divert.retro` +2. An address named `$.artemis.internal.myAddress.address.retro` +3. A queue on the address from step #2 named + `$.artemis.internal.myAddress.queue.retro` with a `ring-size` of 10. + +This pattern is important to note as it allows one to configure address-settings +if necessary. + +> Note: +> +> Changing the broker's `internal-naming-prefix` once these retroactive +> resources are created will break the retroactive functionality. +> + +## Configuration + +To configure an address to be "retroactive" simply configure the +`retroactive-message-count` `address-setting` to reflect the number of messages +you want the broker to preserve, e.g.: + + +```xml + + + 100 + + +``` + +The value for `retroactive-message-count` can be updated at runtime either via Review comment: This suggests it updates dynamically. It isnt entirely true as will only update the default for new addresses, existing addresses where the retro queue is created already it doesnt update. This really is acting as a default value, not the explicit. If you needed to change at runtime and existing you would need to find the internal queue and update it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328402743 ## File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd ## @@ -3404,6 +3404,14 @@ + + Review comment: Can prefix default. In future or even in this pr it would be good to be able to explicitly set/control this at an address level. Thus this would just be a default thats then applied if none is set at that level This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328402189 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, Review comment: Should message groups be disabled? So theyre not tracked as not needed in the retro queue and would avoid issues if people have very large numbers of groups. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535288242 On the retroactive queue, would want a flag to tell us its a retro queue, thats then exposed to jmx so that metrics systems can get the tag, and so alerting rules on queue depths can ignore these queues automatically based on that flag. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328401160 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ## @@ -1170,6 +1172,10 @@ public SecuritySettingPlugin run() { addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child)); } else if (DEFAULT_RING_SIZE.equalsIgnoreCase(name)) { addressSettings.setDefaultRingSize(XMLUtil.parseLong(child)); + } else if (RETROACTIVE_MESSAGE_COUNT.equalsIgnoreCase(name)) { +long retroactiveMessageCount = XMLUtil.parseLong(child); +Validators.GE_ZERO.validate(DEFAULT_ADDRESS_ROUTING_TYPE, retroactiveMessageCount); Review comment: Default address routing type in the validator for msg count? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400809 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); Review comment: It may mean a retro anycast and a retro multicast queue is required to preserve that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400809 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); Review comment: It may mean in a retro anycast and a retro multicast queue is required to preserve that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400519 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); Review comment: I dont think we should be stripping. We need to ensure anycast messages only end up in anycast queues and like wise multicast. When theyre consumed by a retroactive consumer This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328400519 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); Review comment: I dont think we should be stripping. We need to ensure anycast messages only end up in anycast queues and like wise multicast. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328399892 ## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.server; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Before; +import org.junit.Test; + +/** + * A simple test-case used for documentation purposes. + */ +public class RetroactiveAddressTest extends ActiveMQTestBase { + + protected ActiveMQServer server; + + protected ClientSession session; + + protected ClientSessionFactory sf; + + protected ServerLocator locator; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, createDefaultInVMConfig()); + server.getConfiguration().setThreadPoolMaxSize(200); + server.start(); + locator = createInVMNonHALocator(); + sf = createSessionFactory(locator); + session = addClientSession(sf.createSession(false, true, true)); + } + + @Test + public void testRetroactiveResourceCreation() throws Exception { + final SimpleString addressName = SimpleString.toSimpleString("myAddress"); + final SimpleString divertAddress = ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(), addressName, ResourceNames.ADDRESS); + final SimpleString divertQueue = ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(), addressName, ResourceNames.QUEUE); + final SimpleString divert = ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(), addressName, ResourceNames.DIVERT); + server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10)); + server.addAddressInfo(new AddressInfo(addressName)); + assertNotNull(server.getAddressInfo(divertAddress)); + assertNotNull(server.locateQueue(divertQueue)); + assertNotNull(server.getPostOffice().getBinding(divert)); + } + + @Test + public void testRetroactiveResourceRemoval() throws Exception { + final SimpleString addressName = SimpleString.toSimpleString("myAddress"); + final SimpleString divertAddress = ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(), addressName, ResourceNames.ADDRESS); + final SimpleString divertQueue = ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(), addressName, ResourceNames.QUEUE); + final SimpleString divert = ResourceNames.getRetroactiveResourceName(server.getConfiguration().getInternalNamingPrefix(), addressName, ResourceNames.DIVERT); +
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328398850 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); + } + + private void removeRetroactiveResources(SimpleString address) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + + SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT); Review comment: Why divert and separate address is needed. Why not have the retro queue under the original address? With having separate address will mean message has to be copied rather than it simply being a message reference to existing message. Also adds more risk in what if divert or other address dont clean up cleanly etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328398850 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ## @@ -467,6 +478,83 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr } } + private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) { + HierarchicalRepositoryChangeListener repositoryChangeListener = () -> { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + String address = ResourceNames.decomposeRetroactiveResourceName(prefix, name.toString(), ResourceNames.ADDRESS); + AddressSettings settings = addressSettingsRepository.getMatch(address); + Queue internalQueue = server.locateQueue(ResourceNames.getRetroactiveResourceName(prefix, SimpleString.toSimpleString(address), ResourceNames.QUEUE)); + if (internalQueue != null && internalQueue.getRingSize() != settings.getRetroactiveMessageCount()) { +internalQueue.setRingSize(settings.getRetroactiveMessageCount()); + } + }; + addressSettingsRepository.registerListener(repositoryChangeListener); + server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener); + } + + private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.ADDRESS); + final SimpleString internalQueueName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.QUEUE); + final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, retroactiveAddressName, ResourceNames.DIVERT); + + if (!reload) { + AddressInfo addressInfo = new AddressInfo(internalAddressName).addRoutingType(RoutingType.ANYCAST).setInternal(true); + addAddressInfo(addressInfo); + server.createQueue(internalAddressName, +RoutingType.ANYCAST, +internalQueueName, +null, +null, +true, +false, +false, +false, +false, +0, +false, +false, +false, +0, +null, +false, +null, +false, +0, +0L, +false, +0L, +0L, +false, +retroactiveMessageCount); + } + server.deployDivert(new DivertConfiguration() + .setName(internalDivertName.toString()) + .setAddress(retroactiveAddressName.toString()) + .setExclusive(false) + .setForwardingAddress(internalAddressName.toString()) + .setRoutingType(ComponentConfigurationRoutingType.STRIP)); + } + + private void removeRetroactiveResources(SimpleString address) throws Exception { + String prefix = server.getConfiguration().getInternalNamingPrefix(); + + SimpleString internalDivertName = ResourceNames.getRetroactiveResourceName(prefix, address, ResourceNames.DIVERT); Review comment: Why divert and separate address is needed. Why not have the retro queue under the original address? With having separate address will mean message has to be copied rather than it simply being a message reference to existing message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144 A few bits as havent fully explored the feature just yet. How does someone control if the retroactive address will behave when full, e.g. page, block, drop? How or where can someone configure the naming of a retroactive address so they can control their names to avoid conflict or for security settings. How can we configure this setting differently per address. For a specific address it would be good to be able to configure the retroactive queue fully, eg at the address level define a queue (like we can others) and then specify the queue to use. E.g. for cases where retroactive you may want LVQ behaviour or a filter For queues created for retroactive, if there is a very high number of message groups and someone is using buckets so that the group map doesnt explode in size, how is this being avoided in the retroactive queue? Is routing type being preserved. E.g. if a new multicast queue only messages sent to the address as multicast get delivered to it. And like wise anycast. Otherwise will cause issues. As normally any anycast message wouldnt have been delivered to multicast queue. But now it could possibly, if its not honoured. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144 A few bits as havent fully explored the feature just yet. How does someone control if the retroactive address will behave when full, e.g. page, block, drop? How or where can someone configure the naming of a retroactive address so they can control their names to avoid conflict or for security settings. How can we configure this setting differently per address. For a specific address it would be good to be able to configure the retroactive queue fully, eg at the address level define a queue (like we can others) and then specify the queue to use. E.g. for cases where retroactive you may want LVQ behaviour or a filter For queues created for retroactive, if there is a very high number of message groups and someone is using buckets so that the group map doesnt explode in size, how is this being avoided in the retroactive queue? Is routing type being preserved. E.g. if a new multicast queue only messages sent to the address as multicast get delivered to it. And like wise anycast. Otherwise will cause issues. As normally any anycast message wouldnt have been delivered to multicast queur. But now it could possibly, if its not honoured. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144 A few bits as havent fully explored the feature just yet. How does someone control if the retroactive address will behave when full, e.g. page, block, drop? How or where can someone configure the naming of a retroactive address so they can control their names to avoid conflict or for security settings. How can we configure this setting differently per address. For a specific address it would be good to be able to configure the retroactive queue fully, eg at the address level define a queue (like we can others) and then specify the queue to use. E.g. for cases where retroactive you may want LVQ behaviour or a filter For queues created for retroactive, if there is a very high number of message groups and someone is using buckets so that the group map doesnt explode in size, how is this being avoided in the retroactive queue? Is routing type being preserved. E.g. if a new multicast queue only messages sent to the address as multicast get delivered to it. And like wise anycast. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce edited a comment on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144 A few bits as havent fully explored the feature just yet. How does someone control if the retroactive address will behave when full, e.g. page, block, drop? How or where can someone configure the naming of a retroactive address so they can control their names to avoid conflict or for security settings. How can we configure this setting differently per address. For a specific address it would be good to be able to configure the retroactive queue fully, eg at the address level define a queue (like we can others) and then specify the queue to use. E.g. for cases where retroactive you may want LVQ behaviour or a filter For queues created for retroactive, if there is a very high number of message groups and someone is using buckets so that the group map doesnt explode in size, how is this being avoided in the retroactive queue? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses
michaelandrepearce commented on issue #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#issuecomment-535272144 A few bits as havent fully explored the feature just yet. How does someone control if the retroactive address will behave when full, e.g. page, block, drop? How or where can someone configure the naming of a retroactive address so they can control their names to avoid conflict or for security settings. For a specific address it would be good to be able to configure the retroactive queue fully, eg at the address level define a queue (like we can others) and then specify the queue to use. E.g. for cases where retroactive you may want LVQ behaviour. For queues created for retroactive, if there is a very high number of message groups and someone is using buckets so that the group map doesnt explode in size, how is this being avoided in the retroactive queue? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328133175 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: Still searching for a reproducer to test https://github.com/apache/activemq-artemis/pull/2845/commits/9853633d3b4c2aacf37d1b402712b0a9938a827c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] k-wall commented on issue #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting
k-wall commented on issue #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting URL: https://github.com/apache/activemq-artemis/pull/2852#issuecomment-535052799 @franz1981 unit test added as requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] gaohoward opened a new pull request #2853: ARTEMIS-2506 MQTT doesn't cleanup underlying connection for bad clients
gaohoward opened a new pull request #2853: ARTEMIS-2506 MQTT doesn't cleanup underlying connection for bad clients URL: https://github.com/apache/activemq-artemis/pull/2853 When a bad MQTT clients drop its connection without proper closing it the broker doesn't close the underlying physical connection. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328133175 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: Still searchign for a reproducer to test https://github.com/apache/activemq-artemis/pull/2845/commits/9853633d3b4c2aacf37d1b402712b0a9938a827c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] k-wall opened a new pull request #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting
k-wall opened a new pull request #2852: ARTEMIS-2505: Fix wiring of the max-size-bytes-reject-threshold address-setting URL: https://github.com/apache/activemq-artemis/pull/2852 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328101333 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: > How about not opening raf and new ReplicationSyncFileMessage(content, pageStore, id, null, null, offset, toSend) when file size is 0 so we don't take care to release it? I like it, effectively is wasted effort to open/close it for nothing... I wil manage the 2 things in 2 separate commits: 1) to address the 0 length transferts 2) to simplify flow control: given that is a concurrent backpressure doesn't need to be super-precise, slowing down the system for no reasons This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328099516 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: AFAIK 0 bytes file will be created in some cases: 1. In the beginning of `startReplication`, 0 bytes page file will be created. If backup fails and connects to live again, 0 bytes page file will be sent. 2. When all pages are consumed, `PageCursorProviderImpl::cleanupComplete` will be called generating 0 bytes page file, and might be sent to backup later. That works, but we'll add a new send method only used here? How about not opening raf and `new ReplicationSyncFileMessage(content, pageStore, id, null, null, offset, toSend)` when file size is 0 so we don't take care to release it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#issuecomment-534999649 @wy96f > . So only non file packet accounts for the pending write bytes in channel, and flowControl is not working very well in this case, will this have a negative impact? I see that `DefaultMessageSizeEstimator` for `FileRegion` isn't handled nor `ChannelOutboundBuffer::decrementPendingOutboundBytes` is called afted `sendFile` succeed: it means that our `lazy` backpressure is quite limiting while using `FileRegion`s. For `ChunkedNioFile` is a different story, but equally interesting: `DefaultMessageSizeEstimator` is not able to recognize `ChunkedNioFile` , but `ChannelOutboundBuffer::decrementPendingOutboundBytes` is correctly handling it because `ChunkedWriteHandler` is transparently handling the files as `ByteBuf`s. I think that is time to simplify `blockUntilWritable` to make it more lazy ie to just leverage on Netty `isWritable` instead of trying to calculate exactly the pending bytes :) I'm adding a commit to address that :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 edited a comment on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 edited a comment on issue #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#issuecomment-534999649 @wy96f > . So only non file packet accounts for the pending write bytes in channel, and flowControl is not working very well in this case, will this have a negative impact? I see that `DefaultMessageSizeEstimator` for `FileRegion` isn't handled nor `ChannelOutboundBuffer::decrementPendingOutboundBytes` is called afted `sendFile` succeed: it means that our "lazy" backpressure is quite limiting while using `FileRegion`s. For `ChunkedNioFile` is a different story, but equally interesting: `DefaultMessageSizeEstimator` is not able to recognize `ChunkedNioFile` , but `ChannelOutboundBuffer::decrementPendingOutboundBytes` is correctly handling it because `ChunkedWriteHandler` is transparently handling the files as `ByteBuf`s. I think that is time to simplify `blockUntilWritable` to make it more lazy ie to just leverage on Netty `isWritable` instead of trying to calculate exactly the pending bytes :) I'm adding a commit to address that :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: I see that : ```java if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), syncFileMessage.getOffset(), syncFileMessage.getDataSize(), lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` So we never send syncFileMessage with file size == 0 and when we will do it with ```java replicatingChannel.send(syncFileMessage); ``` We don't release it when completed. It seems a leak, but is not, because we will likely to call it on the previous iteration and it should be already ready to be release or released upon completion of the previous call. I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* it would happen we're not releasing it correctly: we should add ```java replicatingChannel.send(syncFileMessage, lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` It would cause on the normal path (file::size > 0) to have `syncFileMessage.release` called twice ATM (we mark lastChunk == true for the last 2 sent packets), but `RandomAccessFile::close` should be idempotent so...no harm and it would make the zero sized file to work correctly :) wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: I see that : ```java if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), syncFileMessage.getOffset(), syncFileMessage.getDataSize(), lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` So we never send syncFileMessage with file size == 0 and when we will do it with ```java replicatingChannel.send(syncFileMessage); ``` We don't release it when completed. It seems a leak, but is not, because we will likely to call it on the previous iteration and it should be already ready to be release or released upon completion of the previous call. I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* it would happen we're not releasing it correctly: we should add ``java replicatingChannel.send(syncFileMessage, lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` It would cause on the normal path (file::size > 0) to have `syncFileMessage.release` called twice ATM (we mark lastChunk == true for the last 2 sent packets), but `RandomAccessFile::close` should be idempotent so...no harm :) wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: I see that : ```java if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), syncFileMessage.getOffset(), syncFileMessage.getDataSize(), lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` So we never send syncFileMessage with file size == 0 and when we will do it with ```java replicatingChannel.send(syncFileMessage); ``` We don't release it when completed. It seems a leak, but is not, because we will likely to call it on the previous iteration and it should be already ready to be release or released upon completion of the previous call. I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* it would happen we're not releasing it correctly: we should add ```java replicatingChannel.send(syncFileMessage, lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` It would cause on the normal path (file::size > 0) to have `syncFileMessage.release` called twice ATM (we mark lastChunk == true for the last 2 sent packets), but `RandomAccessFile::close` should be idempotent so...no harm :) wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328073825 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: I see that : ```java if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), syncFileMessage.getOffset(), syncFileMessage.getDataSize(), lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` So we never send syncFileMessage with file size == 0 and when we will do it with ```java replicatingChannel.send(syncFileMessage); ``` We don't release it. It seems a leak, but is not, because we will likely to call it on the previous iteration and it should be already ready to be release or released upon completion of the previous call. I don't think we never send 0 bytes file sized on the wire AFAIK, but *if* it would happen we're not releasing it correctly: we should add ``java replicatingChannel.send(syncFileMessage, lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); ``` It would cause on the normal path (file::size > 0) to have `syncFileMessage.release` called twice ATM (we mark lastChunk == true for the last 2 sent packets), but `RandomAccessFile::close` should be idempotent so...no harm :) wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328068540 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: Good catch: let me take a look! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328056532 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: If we send a 0 size file, we need to close raf as ReplicationSyncFileMessage::release will not be called, correct? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file
wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file URL: https://github.com/apache/activemq-artemis/pull/2666#discussion_r328029179 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -551,49 +581,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: If we send a 0 size file, we need to close `raf` as ReplicationSyncFileMessage::release will not be called, correct? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file
wy96f commented on a change in pull request #2666: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file URL: https://github.com/apache/activemq-artemis/pull/2666#discussion_r328029179 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ## @@ -551,49 +581,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - -// We can afford having a single buffer here for this entire loop -// because sendReplicatePacket will encode the packet as a NettyBuffer -// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy -while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); Review comment: If we send a 0 size file, we need to close `raf` as ReplicationSyncFileMessage::release will not be called, correct? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] k-wall edited a comment on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer
k-wall edited a comment on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer URL: https://github.com/apache/activemq-artemis/pull/2847#issuecomment-534963469 My initial patch missed the code path where the address full policy is BLOCK and the paging store signals itself full.Previously reject would be sent unconditionally. Now Modified is sent if enabled/supported on the link as per the other case. There is a small behavioural change in this patch (a25a289). Previously on the BLOCK code path the Reject for an address-full would be sent with a `resource-limit-exceeded `whereas the other code path would send a Reject for an address-full with a condition `failed`. This patch uses `resource-limit-exceeded` consistently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] k-wall commented on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer
k-wall commented on issue #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer URL: https://github.com/apache/activemq-artemis/pull/2847#issuecomment-534963469 My initial patch missed the code path where the address full policy is BLOCK and the paging store signals itself full.Previously reject would be sent unconditionally. Now Modified is sent if enabled/supported on the link as per the other case. There is a small behavioural change in this patch (a25a289). Previously on the BLOCK code path the Reject for an address-full would be sent with a resource-limit-exceeded whereas the other code path would send a Reject for an address-full with a condition ""failed". This patch uses resource-limit-exceeded consistently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer
k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer URL: https://github.com/apache/activemq-artemis/pull/2847#discussion_r328047546 ## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ## @@ -326,6 +325,37 @@ private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer } } + private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { + Outcome defaultOutcome = getEffectiveDefaultOutcome(source); + if (e instanceof ActiveMQAddressFullException && useModified && + (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + modified.setUndeliverableHere(false); Review comment: That should be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer
k-wall commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer URL: https://github.com/apache/activemq-artemis/pull/2847#discussion_r328047809 ## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ## @@ -326,6 +325,37 @@ private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer } } + private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { + Outcome defaultOutcome = getEffectiveDefaultOutcome(source); + if (e instanceof ActiveMQAddressFullException && useModified && + (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + modified.setUndeliverableHere(false); + return modified; + } else { + if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Rejected) { +ErrorCondition condition = new ErrorCondition(); + +// Set condition +if (e instanceof ActiveMQSecurityException) { + condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS); +} else { + condition.setCondition(Symbol.valueOf("failed")); +} +condition.setDescription(e.getMessage()); + +Rejected rejected = new Rejected(); +rejected.setError(condition); +return rejected; + } else if (source.getDefaultOutcome() instanceof DeliveryState) { +return ((DeliveryState) source.getDefaultOutcome()); + } else { +return Accepted.getInstance(); Review comment: I've changed the code to send Rejected in the case where neither outcomes not default-outcome is specified. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] brusdev commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses
brusdev commented on a change in pull request #2850: ARTEMIS-2504 implement retroactive addresses URL: https://github.com/apache/activemq-artemis/pull/2850#discussion_r328041981 ## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ## @@ -1971,7 +1988,9 @@ private synchronized int iterQueue(final int flushLimit, if (filter1 == null || filter1.match(ref.getMessage())) { messageAction.actMessage(tx, ref); - iter.remove(); + if (remove) { Review comment: Do we need the same check for scheduledDelivery messages (https://github.com/apache/activemq-artemis/blob/master/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java#L128) and paged messages (https://github.com/apache/activemq-artemis/blob/master/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java#L2002)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328011491 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: I'm finishing running the tests, but I believe you can try to run your load generator as well now (fingers crossed!) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r328004191 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: We were missing this https://github.com/apache/activemq-artemis/pull/2845/commits/438c0fb0c69596cbd02f5ec6337c934b331aea96#diff-917ee65648802d0c63faa660eb9f8debR68 to propagate a writeable channel and resume sending This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-nms-amqp] michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support
michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support URL: https://github.com/apache/activemq-nms-amqp/pull/38#discussion_r328002168 ## File path: src/NMS.AMQP/Transport/SecureTransportContext.cs ## @@ -37,8 +37,8 @@ namespace Apache.NMS.AMQP.Transport internal class SecureTransportContext : TransportContext, ISecureTransportContext { -private readonly static List SupportedProtocols; -private readonly static Dictionary SupportedProtocolValues; +private static readonly List SupportedProtocols; Review comment: ah i didnt spot that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327998358 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: > I've proceed in the investigation and I see that we are failing here: https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320 > > And we won't get notified anymore on [ChunkedWriteHandler::channelWritabilityChanged](https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L144) > > It means to me that: > > 1. The receiver side has stopped reading data ie effectively the channel is not writeable anymore > 2. The sender side (on Netty) has some bug while marking progress and/or propagate writeability changes https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320 would not be called as `doFlush` is running in netty executor and channel is still not writable after writing message. Should not be 1 bcs `netstat` showed no backlog. I found `channelWritabilityChanged` was never called during replication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327997358 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: I've found the issue: was on our side. I'm going to update this PR with the change ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-nms-amqp] HavretGC commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support
HavretGC commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support URL: https://github.com/apache/activemq-nms-amqp/pull/38#discussion_r327987510 ## File path: src/NMS.AMQP/Transport/SecureTransportContext.cs ## @@ -37,8 +37,8 @@ namespace Apache.NMS.AMQP.Transport internal class SecureTransportContext : TransportContext, ISecureTransportContext { -private readonly static List SupportedProtocols; -private readonly static Dictionary SupportedProtocolValues; +private static readonly List SupportedProtocols; Review comment: It was static from the beginning. I've merely rearranged words order to adjust it to current code style. From what I see from the code it's static as it never changes. Some consts that are initialized in static constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327984141 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: I've proceed in the investigation and I see that we are failing here: https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320 And we won't get notified anymore on [ChunkedWriteHandler::channelWritabilityChanged](https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L144) It means to me that: 1. The receiver side has stopped reading data ie effectively the channel is not writeable anymore 2. The sender side (on Netty) has some bug while marking progress and/or propagate writeability changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327984141 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: I've proceed in the investigation and I see that we are failing here: https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L320 And we won't get notified anymore on [ChunkedWriteHandler::channelWritabilityChanged](https://github.com/netty/netty/blob/21b7e29ea7c211bb2b889bae3a0c6c5d9f60fb01/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java#L144) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer
gemmellr commented on a change in pull request #2847: ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer URL: https://github.com/apache/activemq-artemis/pull/2847#discussion_r327978612 ## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ## @@ -326,6 +325,37 @@ private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer } } + private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { + Outcome defaultOutcome = getEffectiveDefaultOutcome(source); + if (e instanceof ActiveMQAddressFullException && useModified && + (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + modified.setUndeliverableHere(false); Review comment: You aren't missing anything, I've somehow completely misread that the second time round, thinking it was being set true...no idea how, read start of one line and end of other? :) I probably wouldn't bother setting it, but don't see harm in doing so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-nms-amqp] michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support
michaelpearce-gain commented on a change in pull request #38: AMQNET-616: Add TCP Keep-Alive support URL: https://github.com/apache/activemq-nms-amqp/pull/38#discussion_r327971083 ## File path: src/NMS.AMQP/Transport/SecureTransportContext.cs ## @@ -37,8 +37,8 @@ namespace Apache.NMS.AMQP.Transport internal class SecureTransportContext : TransportContext, ISecureTransportContext { -private readonly static List SupportedProtocols; -private readonly static Dictionary SupportedProtocolValues; +private static readonly List SupportedProtocols; Review comment: why change to static? isnt that dangerous as what occurs if multiple CF's or contexts needing different values This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
franz1981 commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327950181 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: I see, but I've tried to raise the highwatermark value, the outbound TCP buffer and lower the size of chunks to be 32 K getting the same error... I will try to override the back-pressure mechanism we use on NettyConnection making it to return just true... If we will found an error on Netty side I will send a pr there instead This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2844: ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers
franz1981 commented on a change in pull request #2844: ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers URL: https://github.com/apache/activemq-artemis/pull/2844#discussion_r327938663 ## File path: artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java ## @@ -160,14 +173,63 @@ public int read(final ByteBuffer bytes) throws Exception { return read(bytes, null); } + //This value has been tuned just to reduce the memory footprint + //of read/write of the whole file size: given that this value + //is > 8192, RandomAccessFile JNI code will use malloc/free instead + //of using a copy on the stack, but it has been proven to NOT be + //a bottleneck + private static final int BUF_SIZE = 2 * 1024 * 1024; + + private static int readRaf(RandomAccessFile raf, byte[] b, int off, int len, int ioSize) throws IOException { + int remaining = len; + int offset = off; + while (remaining > 0) { + final int chunkSize = Math.min(ioSize, remaining); + final int read = raf.read(b, offset, chunkSize); + assert read != 0; + if (read == -1) { +if (len == remaining) { + return -1; +} +break; + } + offset += read; + remaining -= read; + } + return len - remaining; + } + + private static void writeRaf(RandomAccessFile raf, byte[] b, int off, int len, int ioSize) throws IOException { + int remaining = len; + int offset = off; + while (remaining > 0) { + final int chunkSize = Math.min(ioSize, remaining); + raf.write(b, offset, chunkSize); + offset += chunkSize; + remaining -= chunkSize; + } + } + @Override public synchronized int read(final ByteBuffer bytes, final IOCallback callback) throws IOException, ActiveMQIllegalStateException { try { if (channel == null) { throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel"); } - int bytesRead = channel.read(bytes); + final int bytesRead; + if (bytes.hasArray()) { +if (bytes.remaining() > BUF_SIZE) { + bytesRead = readRaf(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining(), BUF_SIZE); +} else { + bytesRead = rfile.read(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); Review comment: to save inlining budget: for most of the cases where the first check will save the call on the happy path This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN)
wy96f commented on a change in pull request #2845: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) URL: https://github.com/apache/activemq-artemis/pull/2845#discussion_r327948683 ## File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ## @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { Review comment: > So it seems that Netty is slowing sending them...can see 2 reasons for that: > > * we are sending too much for real > * the receiving broker is not reading them fast enough > > I cannot find other reasons, other insights? Actually broker was not sending built up data in the queue after some point until timeout failure. It seems watermark mechanism triggered some bug resulting in not resuming transfer in `ChunkedWriteHandler`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services