Hey,

I’ve been spinning my wheels on this and was hoping someone could help me 
understand the mistakes I’m making. Here’s some background:


  *   Using AmazonMQ for ActiveMQ
  *   Publisher is publishing messages to my broker using MQTT protocol using 
QoS 1 (written in java)
  *   Subscriber is subscribed to topics using MQTT protocol with wildcard, say 
`some/topic/+/item`, with QoS 1 and clean session set to false (written in 
python)

Let me walk you through my particular case:

  1.  Subscriber S subscribes to topic “some/topic/+/item” using MQTT protocol
     *   QoS 1 and clean session set to false
     *   Granted QoS == 1
  2.  Publisher P publishes message to “some/topic/1/item” using MQTT protocol 
for the first time
     *   QoS 1
  3.  Subscriber S receives message on topic some/topic/1/item
  4.  Subscriber S disconnects
  5.  Publisher P publishes message to “some/topic/1/item” using MQTT protocol 
for the second time
  6.  Publisher P publishes message to “some/topic/2/item” using MQTT protocol 
for the first time
  7.  Subscriber S reconnects and subscribes to “some/topic/+/item”
  8.  Subscriber S receives message on topic some/topic/1/item
  9.  Subscriber S DOES NOT receive message on topic some/topic/2/item

This is not the behavior I would expect, and I need to figure out how to 
resolve this before our customers start subscribing to our topics. I need to 
receive all new messages that match the subscription with wildcards, even 
topics that don’t exist prior the subscriber disconnecting.

In the UI, I’m seeing this after completing the case above:

NAME
# OF CONSUMERS
MSGS ENQUEUED
MSGS DEQUEUED
some.topic.1.item
1
2
2
some.topic.2.item
0
0
0


I’m fairly confident I’m just missing some configuration, but I can’t seem to 
find what configuration that may be. Here’s my config:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<broker schedulePeriodForDestinationPurge="10000" 
xmlns=http://activemq.apache.org/schema/core>
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry gcInactiveDestinations="true" 
inactiveTimoutBeforeGC="600000" topic="&gt;">
          <pendingMessageLimitStrategy>
            <constantPendingMessageLimitStrategy limit="1000"/>
          </pendingMessageLimitStrategy>
        </policyEntry>
        <policyEntry gcInactiveDestinations="true" 
inactiveTimoutBeforeGC="600000" queue="&gt;"/>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
  <plugins>
    <authorizationPlugin>
      <map>
        <authorizationMap>
          <authorizationEntries>
            <authorizationEntry myAuthEntries... />
          </authorizationEntries>
        </authorizationMap>
      </map>
    </authorizationPlugin>
  </plugins>
</broker>


This is mostly just the defaults that AWS gave me. I tried removing the 
`constantPendingMessageLimitStrategy`, as I was wondering if we were sending 
thousands of messages to the advisory topics that was causing messages to get 
deleted from other topics. I didn’t think this made any sense and, sure enough, 
it didn’t work.

Is there anyone that can lend their expertise and help me resolve this issue?

Thanks so much!

[ReUqX18imJiQAAAAABJRU5ErkJggg==]
Jonathan Damon
Finished Product Team Captain

Reply via email to