It's likely the cause of the problem is timing of the matching on the subscription pattern - does the second topic exist when the subscription is created?
I know how this operates for JMS, and composites work that way - when the subscription to a composite is created, then the list of actual destinations is resolved. Art On Tue, Aug 8, 2023 at 1:28 PM John Damon <john.da...@freightverify.com> wrote: > Hey everyone, > > > > I replicated this issue on my local machine, and I’ll provide all details > I can. I think it’s an issue with the authorization plugin. > > > > Just to restate: the issue is that my subscriber does not receive *some* > missed messages upon reconnecting. > > > > *activemq.xml* > > <!-- > > Licensed to the Apache Software Foundation (ASF) under one or more > > contributor license agreements. See the NOTICE file distributed with > > this work for additional information regarding copyright ownership. > > The ASF licenses this file to You under the Apache License, Version 2.0 > > (the "License"); you may not use this file except in compliance with > > the License. You may obtain a copy of the License at > > > > http://www.apache.org/licenses/LICENSE-2.0 > > > > Unless required by applicable law or agreed to in writing, software > > distributed under the License is distributed on an "AS IS" BASIS, > > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > See the License for the specific language governing permissions and > > limitations under the License. > > --> > > <!-- START SNIPPET: example --> > > <beans > > xmlns=http://www.springframework.org/schema/beans > > xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance > > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd"> > > > > <!-- Allows us to use system properties as variables in this > configuration file --> > > <bean class= > "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > > <property name="locations"> > > <value>file:${activemq.conf}/credentials.properties</value> > > </property> > > </bean> > > > > <!-- > > The <broker> element is used to configure the ActiveMQ broker. > > --> > > <broker xmlns=http://activemq.apache.org/schema/core brokerName= > "localhost" dataDirectory="${activemq.data}"> > > > > <destinationPolicy> > > <policyMap> > > <policyEntries> > > <policyEntry topic=">" > > > <!-- The constantPendingMessageLimitStrategy is used > to prevent > > slow topic consumers to block producers and > affect other consumers > > by limiting the number of messages that are > retained > > For more information, see: > > > > > http://activemq.apache.org/slow-consumer-handling.html > > > > --> > > <pendingMessageLimitStrategy> > > <constantPendingMessageLimitStrategy limit="1000"/> > > </pendingMessageLimitStrategy> > > </policyEntry> > > </policyEntries> > > </policyMap> > > </destinationPolicy> > > > > <!-- > > The managementContext is used to configure how ActiveMQ is > exposed in > > JMX. By default, ActiveMQ uses the MBean server that is > started by > > the JVM. For more information, see: > > > > http://activemq.apache.org/jmx.html > > --> > > <managementContext> > > <managementContext createConnector="false"/> > > </managementContext> > > > > <!-- > > Configure message persistence for the broker. The default > persistence > > mechanism is the KahaDB store (identified by the kahaDB tag). > > For more information, see: > > > > http://activemq.apache.org/persistence.html > > --> > > <persistenceAdapter> > > <kahaDB directory="${activemq.data}/kahadb"/> > > </persistenceAdapter> > > > > <!-- > > The systemUsage controls the maximum amount of space the > broker will > > use before disabling caching and/or slowing down producers. > For more information, see: > > http://activemq.apache.org/producer-flow-control.html > > --> > > <systemUsage> > > <systemUsage> > > <memoryUsage> > > <memoryUsage percentOfJvmHeap="70" /> > > </memoryUsage> > > <storeUsage> > > <storeUsage limit="100 gb"/> > > </storeUsage> > > <tempUsage> > > <tempUsage limit="50 gb"/> > > </tempUsage> > > </systemUsage> > > </systemUsage> > > > > <!-- > > The transport connectors expose ActiveMQ over a given protocol > to > > clients and other brokers. For more information, see: > > > > http://activemq.apache.org/configuring-transports.html > > --> > > <transportConnectors> > > <!-- DOS protection, limit concurrent connections to 1000 and > frame size to 100MB --> > > <transportConnector name="openwire" uri="tcp:// > 0.0.0.0:61616?maximumConnections=1000& > wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="amqp" uri="amqp:// > 0.0.0.0:5672?maximumConnections=1000& > wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="stomp" uri="stomp:// > 0.0.0.0:61613?maximumConnections=1000& > wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="mqtt" uri="mqtt:// > 0.0.0.0:1883?maximumConnections=1000& > wireFormat.maxFrameSize=104857600"/> > > <transportConnector name="ws" uri="ws:// > 0.0.0.0:61614?maximumConnections=1000& > wireFormat.maxFrameSize=104857600"/> > > </transportConnectors> > > > > <!-- destroy the spring context on shutdown to stop jetty --> > > <shutdownHooks> > > <bean xmlns=http://www.springframework.org/schema/beans class= > "org.apache.activemq.hooks.SpringContextHook" /> > > </shutdownHooks> > > > > <plugins> > > <jaasAuthenticationPlugin configuration="activemq" /> > > <authorizationPlugin> > > <map> > > <authorizationMap> > > <authorizationEntries> > > <authorizationEntry admin="*" read="*" topic= > "ActiveMQ.Advisory.>" write="*"/> > > <authorizationEntry admin="producers" read= > "consumers" topic="some.topic.>" write="producers"/> > > </authorizationEntries> > > </authorizationMap> > > </map> > > </authorizationPlugin> > > </plugins> > > > > </broker> > > > > <!-- > > Enable web consoles, REST and Ajax APIs and demos > > The web consoles requires by default login, you can disable this > in the jetty.xml file > > > > Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details > > --> > > <import resource="jetty.xml"/> > > > > </beans> > > <!-- END SNIPPET: example --> > > > > > > *user.properties* > > admin=admin > > producer=producer > > consumer=consumer > > > > *groups.properties* > > admins=admin > > producers=producer > > consumers=consumer > > > > > > *My python subscriber:* > > import paho.mqtt.client as mqtt > > > > topic = "some/topic/+/item" > > local_broker = "localhost" > > local_port = 1883 > > > > def on_connect(client, userdata, flags, rc): > > print("Connected: " + str(rc)) > > client.subscribe(topic, qos=1) > > > > def on_subscribe(client, userdata, mid, granted_qos): > > print("Subscribed: " + str(mid) + " " + str(granted_qos)) > > > > def on_message(client, userdata, msg): > > print("Received message: " + str(msg.payload.decode())) > > > > client = mqtt.Client("doesntmatter", clean_session=False) > > client.on_connect = on_connect > > client.on_message = on_message > > client.on_subscribe = on_subscribe > > > > client.username_pw_set("consumer", "consumer") # local > > client.connect(local_broker, local_port) > > client.loop_forever() > > > > *My python publisher:* > > import paho.mqtt.client as paho > > import random, string > > > > def randomword(length): > > letters = string.ascii_lowercase > > return "".join(random.choice(letters) for i in range(length)) > > > > local_broker = "localhost" > > local_port = 1883 > > topic = f"some/topic/{randomword(7)}/item" > > > > def on_connect(client, userdata, flags, rc): > > print("Connected: " + str(rc)) > > > > def on_publish(client, userdata, result): # create function for callback > > print(f"data published: {result}") > > print(client) > > print(userdata) > > print(result) > > pass > > > > client1 = paho.Client("whatever") > > client1.on_publish = on_publish > > client1.username_pw_set("producer", "producer") > > client1.connect(local_broker, local_port) # local > > ret = client1.publish( > > topic, > > '{"message": "From publisher!"}', > > qos=1, > > ) > > > > > > *Steps to replicate:* > > 1. Run subscriber script > 2. Run publisher script > 3. Stop subscriber script > 4. Run publisher script > 5. Run subscriber script > 6. Subscriber does not receive published message even though it has > QoS == 1 and clean_session set to False! > > > > *What I have noticed:* > > - If you comment out the whole <plugins/> block from `activemq.xml`, > it works as expected. > - This also works if you send a message in on a topic that’s already > created > - When you run step 4, publish a message to the same topic to > published to on step 2 (remove random string logic). On step 5, the > subscriber will receive the message. > > > > I’m assuming something is funky with my authorization entries. > > > > Thanks everyone, hoping someone can help me figure this out. > > > > [image: ReUqX18imJiQAAAAABJRU5ErkJggg==] > > *Jonathan Damon *Finished Product Team Captain > > > > *From: *Justin Bertram <jbert...@apache.org> > *Date: *Tuesday, August 1, 2023 at 4:50 PM > *To: *users@activemq.apache.org <users@activemq.apache.org> > *Subject: *Re: Not receiving messages after subscribed to topic with QoS > 1 and clean session set to false? > > You could potentially reproduce the problem on a locally running community > release of ActiveMQ "Classic." That wouldn't be too hard, I think. Then > you'd know for sure the problem wasn't introduced via AmazonMQ, and the > community would be more able to help. > > > Justin > > On Tue, Aug 1, 2023 at 9:25 AM John Damon <john.da...@freightverify.com> > wrote: > > > Hey, > > > > > > > > I’ve been spinning my wheels on this and was hoping someone could help me > > understand the mistakes I’m making. Here’s some background: > > > > > > > > - Using AmazonMQ for ActiveMQ > > - Publisher is publishing messages to my broker using MQTT protocol > > using QoS 1 (written in java) > > - Subscriber is subscribed to topics using MQTT protocol with > > wildcard, say `some/topic/+/item`, with QoS 1 and clean session set to > > false (written in python) > > > > > > > > Let me walk you through my particular case: > > > > 1. Subscriber S subscribes to topic “some/topic/+/item” using MQTT > > protocol > > 1. QoS 1 and clean session set to false > > 2. Granted QoS == 1 > > 2. Publisher P publishes message to “some/topic/1/item” using MQTT > > protocol for the first time > > 1. QoS 1 > > 3. Subscriber S receives message on topic some/topic/1/item > > 4. Subscriber S disconnects > > 5. Publisher P publishes message to “some/topic/1/item” using MQTT > > protocol for the second time > > 6. Publisher P publishes message to “some/topic/2/item” using MQTT > > protocol for the first time > > 7. Subscriber S reconnects and subscribes to “some/topic/+/item” > > 8. Subscriber S receives message on topic some/topic/1/item > > 9. Subscriber S DOES NOT receive message on topic some/topic/2/item > > > > > > > > This is not the behavior I would expect, and I need to figure out how to > > resolve this before our customers start subscribing to our topics. I need > > to receive all new messages that match the subscription with wildcards, > > even topics that don’t exist prior the subscriber disconnecting. > > > > > > > > In the UI, I’m seeing this after completing the case above: > > > > > > > > NAME > > > > # OF CONSUMERS > > > > MSGS ENQUEUED > > > > MSGS DEQUEUED > > > > some.topic.1.item > > > > 1 > > > > 2 > > > > 2 > > > > some.topic.2.item > > > > 0 > > > > 0 > > > > 0 > > > > > > > > > > > > I’m fairly confident I’m just missing some configuration, but I can’t > seem > > to find what configuration that may be. Here’s my config: > > > > > > > > <?xml version="1.0" encoding="UTF-8" standalone="yes"?> > > > > <broker schedulePeriodForDestinationPurge="10000" xmlns= > > http://activemq.apache.org/schema/core> > > > > <destinationPolicy> > > > > <policyMap> > > > > <policyEntries> > > > > <policyEntry gcInactiveDestinations="true" > inactiveTimoutBeforeGC= > > "600000" topic=">"> > > > > <pendingMessageLimitStrategy> > > > > <constantPendingMessageLimitStrategy limit="1000"/> > > > > </pendingMessageLimitStrategy> > > > > </policyEntry> > > > > <policyEntry gcInactiveDestinations="true" > inactiveTimoutBeforeGC= > > "600000" queue=">"/> > > > > </policyEntries> > > > > </policyMap> > > > > </destinationPolicy> > > > > <plugins> > > > > <authorizationPlugin> > > > > <map> > > > > <authorizationMap> > > > > <authorizationEntries> > > > > <authorizationEntry myAuthEntries... /> > > > > </authorizationEntries> > > > > </authorizationMap> > > > > </map> > > > > </authorizationPlugin> > > > > </plugins> > > > > </broker> > > > > > > > > > > > > This is mostly just the defaults that AWS gave me. I tried removing the > > `constantPendingMessageLimitStrategy`, as I was wondering if we were > > sending thousands of messages to the advisory topics that was causing > > messages to get deleted from other topics. I didn’t think this made any > > sense and, sure enough, it didn’t work. > > > > > > > > Is there anyone that can lend their expertise and help me resolve this > > issue? > > > > > > > > Thanks so much! > > > > > > > > [image: ReUqX18imJiQAAAAABJRU5ErkJggg==] > > > > *Jonathan Damon *Finished Product Team Captain > > >