Perhaps it helps to understand that Topics never store messages themselves
- topic subscriptions store messages.  This includes non-persistent
messages stored in memory.

So if there is a send to a topic that has no subscriptions, whether durable
or non-durable, the message ends up discarded.

Art


On Wed, Aug 16, 2023 at 7:56 AM John Damon <john.da...@freightverify.com>
wrote:

> Hey everyone,
>
>
>
> This issue persists.  Any ideas? I’m thinking this may be a bug with
> ActiveMQ (or at least the authorization plugin). Otherwise, there must be
> something wrong with my `<authorizationPlugin/>` section but I can’t find
> any documentation indicating what I may be doing wrong.
>
>
>
> Thanks again,
>
>
>
> - Jonathan
>
>
>
> *From: *John Damon <john.da...@freightverify.com>
> *Date: *Tuesday, August 8, 2023 at 4:27 PM
> *To: *users@activemq.apache.org <users@activemq.apache.org>
> *Subject: *Re: Not receiving messages after subscribed to topic with QoS
> 1 and clean session set to false?
>
> Hey everyone,
>
>
>
> I replicated this issue on my local machine, and I’ll provide all details
> I can. I think it’s an issue with the authorization plugin.
>
>
>
> Just to restate: the issue is that my subscriber does not receive *some*
> missed messages upon reconnecting.
>
>
>
> *activemq.xml*
>
> <!--
>
>     Licensed to the Apache Software Foundation (ASF) under one or more
>
>     contributor license agreements.  See the NOTICE file distributed with
>
>     this work for additional information regarding copyright ownership.
>
>     The ASF licenses this file to You under the Apache License, Version 2.0
>
>     (the "License"); you may not use this file except in compliance with
>
>     the License.  You may obtain a copy of the License at
>
>
>
>     http://www.apache.org/licenses/LICENSE-2.0
>
>
>
>     Unless required by applicable law or agreed to in writing, software
>
>     distributed under the License is distributed on an "AS IS" BASIS,
>
>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>
>     See the License for the specific language governing permissions and
>
>     limitations under the License.
>
> -->
>
> <!-- START SNIPPET: example -->
>
> <beans
>
>   xmlns=http://www.springframework.org/schema/beans
>
>   xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance
>
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>
>
>
>     <!-- Allows us to use system properties as variables in this
> configuration file -->
>
>     <bean class=
> "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>
>         <property name="locations">
>
>             <value>file:${activemq.conf}/credentials.properties</value>
>
>         </property>
>
>     </bean>
>
>
>
>     <!--
>
>         The <broker> element is used to configure the ActiveMQ broker.
>
>     -->
>
>     <broker xmlns=http://activemq.apache.org/schema/core brokerName=
> "localhost" dataDirectory="${activemq.data}">
>
>
>
>         <destinationPolicy>
>
>             <policyMap>
>
>               <policyEntries>
>
>                 <policyEntry topic=">" >
>
>                     <!-- The constantPendingMessageLimitStrategy is used
> to prevent
>
>                          slow topic consumers to block producers and
> affect other consumers
>
>                          by limiting the number of messages that are
> retained
>
>                          For more information, see:
>
>
>
>
> http://activemq.apache.org/slow-consumer-handling.html
>
>
>
>                     -->
>
>                   <pendingMessageLimitStrategy>
>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>
>                   </pendingMessageLimitStrategy>
>
>                 </policyEntry>
>
>               </policyEntries>
>
>             </policyMap>
>
>         </destinationPolicy>
>
>
>
>         <!--
>
>             The managementContext is used to configure how ActiveMQ is
> exposed in
>
>             JMX. By default, ActiveMQ uses the MBean server that is
> started by
>
>             the JVM. For more information, see:
>
>
>
>             http://activemq.apache.org/jmx.html
>
>         -->
>
>         <managementContext>
>
>             <managementContext createConnector="false"/>
>
>         </managementContext>
>
>
>
>         <!--
>
>             Configure message persistence for the broker. The default
> persistence
>
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>
>             For more information, see:
>
>
>
>             http://activemq.apache.org/persistence.html
>
>         -->
>
>         <persistenceAdapter>
>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>
>         </persistenceAdapter>
>
>
>
>           <!--
>
>             The systemUsage controls the maximum amount of space the
> broker will
>
>             use before disabling caching and/or slowing down producers.
> For more information, see:
>
>             http://activemq.apache.org/producer-flow-control.html
>
>           -->
>
>           <systemUsage>
>
>             <systemUsage>
>
>                 <memoryUsage>
>
>                     <memoryUsage percentOfJvmHeap="70" />
>
>                 </memoryUsage>
>
>                 <storeUsage>
>
>                     <storeUsage limit="100 gb"/>
>
>                 </storeUsage>
>
>                 <tempUsage>
>
>                     <tempUsage limit="50 gb"/>
>
>                 </tempUsage>
>
>             </systemUsage>
>
>         </systemUsage>
>
>
>
>         <!--
>
>             The transport connectors expose ActiveMQ over a given protocol
> to
>
>             clients and other brokers. For more information, see:
>
>
>
>             http://activemq.apache.org/configuring-transports.html
>
>         -->
>
>         <transportConnectors>
>
>             <!-- DOS protection, limit concurrent connections to 1000 and
> frame size to 100MB -->
>
>             <transportConnector name="openwire" uri="tcp://
> 0.0.0.0:61616?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>
>             <transportConnector name="amqp" uri="amqp://
> 0.0.0.0:5672?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>
>             <transportConnector name="stomp" uri="stomp://
> 0.0.0.0:61613?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>
>             <transportConnector name="mqtt" uri="mqtt://
> 0.0.0.0:1883?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>
>             <transportConnector name="ws" uri="ws://
> 0.0.0.0:61614?maximumConnections=1000&amp;
> wireFormat.maxFrameSize=104857600"/>
>
>         </transportConnectors>
>
>
>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>
>         <shutdownHooks>
>
>             <bean xmlns=http://www.springframework.org/schema/beans class=
> "org.apache.activemq.hooks.SpringContextHook" />
>
>         </shutdownHooks>
>
>
>
>         <plugins>
>
>             <jaasAuthenticationPlugin configuration="activemq" />
>
>             <authorizationPlugin>
>
>                 <map>
>
>                     <authorizationMap>
>
>                         <authorizationEntries>
>
>                             <authorizationEntry admin="*" read="*" topic=
> "ActiveMQ.Advisory.>" write="*"/>
>
>                             <authorizationEntry admin="producers" read=
> "consumers" topic="some.topic.>" write="producers"/>
>
>                         </authorizationEntries>
>
>                     </authorizationMap>
>
>                 </map>
>
>             </authorizationPlugin>
>
>         </plugins>
>
>
>
>     </broker>
>
>
>
>     <!--
>
>         Enable web consoles, REST and Ajax APIs and demos
>
>         The web consoles requires by default login, you can disable this
> in the jetty.xml file
>
>
>
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>
>     -->
>
>     <import resource="jetty.xml"/>
>
>
>
> </beans>
>
> <!-- END SNIPPET: example -->
>
>
>
>
>
> *user.properties*
>
> admin=admin
>
> producer=producer
>
> consumer=consumer
>
>
>
> *groups.properties*
>
> admins=admin
>
> producers=producer
>
> consumers=consumer
>
>
>
>
>
> *My python subscriber:*
>
> import paho.mqtt.client as mqtt
>
>
>
> topic = "some/topic/+/item"
>
> local_broker = "localhost"
>
> local_port = 1883
>
>
>
> def on_connect(client, userdata, flags, rc):
>
>     print("Connected: " + str(rc))
>
>     client.subscribe(topic, qos=1)
>
>
>
> def on_subscribe(client, userdata, mid, granted_qos):
>
>     print("Subscribed: " + str(mid) + " " + str(granted_qos))
>
>
>
> def on_message(client, userdata, msg):
>
>     print("Received message: " + str(msg.payload.decode()))
>
>
>
> client = mqtt.Client("doesntmatter", clean_session=False)
>
> client.on_connect = on_connect
>
> client.on_message = on_message
>
> client.on_subscribe = on_subscribe
>
>
>
> client.username_pw_set("consumer", "consumer")  # local
>
> client.connect(local_broker, local_port)
>
> client.loop_forever()
>
>
>
> *My python publisher:*
>
> import paho.mqtt.client as paho
>
> import random, string
>
>
>
> def randomword(length):
>
>     letters = string.ascii_lowercase
>
>     return "".join(random.choice(letters) for i in range(length))
>
>
>
> local_broker = "localhost"
>
> local_port = 1883
>
> topic = f"some/topic/{randomword(7)}/item"
>
>
>
> def on_connect(client, userdata, flags, rc):
>
>     print("Connected: " + str(rc))
>
>
>
> def on_publish(client, userdata, result):  # create function for callback
>
>     print(f"data published: {result}")
>
>     print(client)
>
>     print(userdata)
>
>     print(result)
>
>     pass
>
>
>
> client1 = paho.Client("whatever")
>
> client1.on_publish = on_publish
>
> client1.username_pw_set("producer", "producer")
>
> client1.connect(local_broker, local_port)  # local
>
> ret = client1.publish(
>
>     topic,
>
>     '{"message": "From publisher!"}',
>
>     qos=1,
>
> )
>
>
>
>
>
> *Steps to replicate:*
>
>    1. Run subscriber script
>    2. Run publisher script
>    3. Stop subscriber script
>    4. Run publisher script
>    5. Run subscriber script
>    6. Subscriber does not receive published message even though it has
>    QoS == 1 and clean_session set to False!
>
>
>
> *What I have noticed:*
>
>    - If you comment out the whole <plugins/> block from `activemq.xml`,
>    it works as expected.
>    - This also works if you send a message in on a topic that’s already
>    created
>       - When you run step 4, publish a message to the same topic to
>       published to on step 2 (remove random string logic). On step 5, the
>       subscriber will receive the message.
>
>
>
> I’m assuming something is funky with my authorization entries.
>
>
>
> Thanks everyone, hoping someone can help me figure this out.
>
>
>
> [image: ReUqX18imJiQAAAAABJRU5ErkJggg==]
>
> *Jonathan Damon *Finished Product Team Captain
>
>
>
> *From: *Justin Bertram <jbert...@apache.org>
> *Date: *Tuesday, August 1, 2023 at 4:50 PM
> *To: *users@activemq.apache.org <users@activemq.apache.org>
> *Subject: *Re: Not receiving messages after subscribed to topic with QoS
> 1 and clean session set to false?
>
> You could potentially reproduce the problem on a locally running community
> release of ActiveMQ "Classic." That wouldn't be too hard, I think. Then
> you'd know for sure the problem wasn't introduced via AmazonMQ, and the
> community would be more able to help.
>
>
> Justin
>
> On Tue, Aug 1, 2023 at 9:25 AM John Damon <john.da...@freightverify.com>
> wrote:
>
> > Hey,
> >
> >
> >
> > I’ve been spinning my wheels on this and was hoping someone could help me
> > understand the mistakes I’m making. Here’s some background:
> >
> >
> >
> >    - Using AmazonMQ for ActiveMQ
> >    - Publisher is publishing messages to my broker using MQTT protocol
> >    using QoS 1 (written in java)
> >    - Subscriber is subscribed to topics using MQTT protocol with
> >    wildcard, say `some/topic/+/item`, with QoS 1 and clean session set to
> >    false (written in python)
> >
> >
> >
> > Let me walk you through my particular case:
> >
> >    1. Subscriber S subscribes to topic “some/topic/+/item” using MQTT
> >    protocol
> >       1. QoS 1 and clean session set to false
> >       2. Granted QoS == 1
> >    2. Publisher P publishes message to “some/topic/1/item” using MQTT
> >    protocol for the first time
> >       1. QoS 1
> >    3. Subscriber S receives message on topic some/topic/1/item
> >    4. Subscriber S disconnects
> >    5. Publisher P publishes message to “some/topic/1/item” using MQTT
> >    protocol for the second time
> >    6. Publisher P publishes message to “some/topic/2/item” using MQTT
> >    protocol for the first time
> >    7. Subscriber S reconnects and subscribes to “some/topic/+/item”
> >    8. Subscriber S receives message on topic some/topic/1/item
> >    9. Subscriber S DOES NOT receive message on topic some/topic/2/item
> >
> >
> >
> > This is not the behavior I would expect, and I need to figure out how to
> > resolve this before our customers start subscribing to our topics. I need
> > to receive all new messages that match the subscription with wildcards,
> > even topics that don’t exist prior the subscriber disconnecting.
> >
> >
> >
> > In the UI, I’m seeing this after completing the case above:
> >
> >
> >
> > NAME
> >
> > # OF CONSUMERS
> >
> > MSGS ENQUEUED
> >
> > MSGS DEQUEUED
> >
> > some.topic.1.item
> >
> > 1
> >
> > 2
> >
> > 2
> >
> > some.topic.2.item
> >
> > 0
> >
> > 0
> >
> > 0
> >
> >
> >
> >
> >
> > I’m fairly confident I’m just missing some configuration, but I can’t
> seem
> > to find what configuration that may be. Here’s my config:
> >
> >
> >
> > <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
> >
> > <broker schedulePeriodForDestinationPurge="10000" xmlns=
> > http://activemq.apache.org/schema/core>
> >
> >   <destinationPolicy>
> >
> >     <policyMap>
> >
> >       <policyEntries>
> >
> >         <policyEntry gcInactiveDestinations="true"
> inactiveTimoutBeforeGC=
> > "600000" topic="&gt;">
> >
> >           <pendingMessageLimitStrategy>
> >
> >             <constantPendingMessageLimitStrategy limit="1000"/>
> >
> >           </pendingMessageLimitStrategy>
> >
> >         </policyEntry>
> >
> >         <policyEntry gcInactiveDestinations="true"
> inactiveTimoutBeforeGC=
> > "600000" queue="&gt;"/>
> >
> >       </policyEntries>
> >
> >     </policyMap>
> >
> >   </destinationPolicy>
> >
> >   <plugins>
> >
> >     <authorizationPlugin>
> >
> >       <map>
> >
> >         <authorizationMap>
> >
> >           <authorizationEntries>
> >
> >             <authorizationEntry myAuthEntries... />
> >
> >           </authorizationEntries>
> >
> >         </authorizationMap>
> >
> >       </map>
> >
> >     </authorizationPlugin>
> >
> >   </plugins>
> >
> > </broker>
> >
> >
> >
> >
> >
> > This is mostly just the defaults that AWS gave me. I tried removing the
> > `constantPendingMessageLimitStrategy`, as I was wondering if we were
> > sending thousands of messages to the advisory topics that was causing
> > messages to get deleted from other topics. I didn’t think this made any
> > sense and, sure enough, it didn’t work.
> >
> >
> >
> > Is there anyone that can lend their expertise and help me resolve this
> > issue?
> >
> >
> >
> > Thanks so much!
> >
> >
> >
> > [image: ReUqX18imJiQAAAAABJRU5ErkJggg==]
> >
> > *Jonathan Damon *Finished Product Team Captain
> >
>

Reply via email to