I’m trying to use the ActiveMQ Broker Camel Component to add some JMS user 
properties to messages as they arrive at the broker.  I’m using a wildcard on 
the from so I can apply the same logic to a set of topics or queues, but I 
can’t seem to send the message back to the original queue.

Without wildcards, it works fine - something like this:
<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="
            http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
            http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd";>

    <camelContext id="audit-enrichment" 
xmlns="http://camel.apache.org/schema/spring";>
        <route id="in-audit-to-file">
            <from uri="broker://queue:in.adt.epic"/>
            <setHeader headerName="MyCustomHeader">
                <constant>MyHeaderValue</constant>
            </setHeader>
            <to uri="broker://queue:in.adt.epic" />
        </route>

    </camelContext>

</beans>

However when I put in the wildcards, I get and IllegalStateException.  The 
configuration I’m trying looks like this
<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="
            http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
            http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd";>

    <camelContext id="audit-enrichment" 
xmlns="http://camel.apache.org/schema/spring";>
        <route id="in-audit-to-file">
            <from uri="broker://queue:in.adt.*"/>
            <setHeader headerName="MyCustomHeader">
                <constant>MyHeaderValue</constant>
            </setHeader>
            <recipientList>
                <simple>broker://${header[JMSDestination]}</simple>
            </recipientList>
        </route>

    </camelContext>

</beans>

And here’s the exception I get

2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId: 
ID-macpro-local-53588-1461869253207-0-2 on ExchangeId: 
ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery attempt: 1 
caught: java.lang.IllegalStateException: Not the original message from the 
broker Message: Enter some text here for the message body...

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                             
                                           Elapsed (ms)
[in-audit-to-file  ] [in-audit-to-file  ] [broker://queue:in.adt.*              
                                         ] [        23]
[in-audit-to-file  ] [setHeader1        ] [setHeader[MyCustomHeader]            
                                         ] [         3]
[in-audit-to-file  ] [recipientList1    ] 
[recipientList[simple{broker://${header[JMSDestination]}}]                     
] [        18]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-macpro-local-53588-1461869253207-0-3
        ExchangePattern     InOnly
        Headers             
{breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1, 
CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=, 
JMSCorrelationIDAsBytes=, JMSDeliveryMode=1, 
JMSDestination=queue://in.adt.epic, JMSExpiration=0, 
JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1, JMSPriority=0, 
JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684, JMSType=, 
JMSXGroupID=null, JMSXUserID=null, MyCustomHeader=MyHeaderValue}
        BodyType            String
        Body                Enter some text here for the message body...
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
 | org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport: 
vm://localhost#1
java.lang.IllegalStateException: Not the original message from the broker 
Message: Enter some text here for the message body...
        at 
org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2]
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2]
        at 
org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2]
        at 
org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2]
2016-04-28 12:47:43,727 | WARN  | Error processing intercepted message: 
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = 
ID:macpro.local-53587-1461869252118-4:1:1:1:1, originalDestination = null, 
originalTransactionId = null, producerId = 
ID:macpro.local-53587-1461869252118-4:1:1:1, destination = queue://in.adt.epic, 
transactionId = null, expiration = 0, timestamp = 1461869263684, arrival = 0, 
brokerInTime = 0, brokerOutTime = 0, correlationId = , replyTo = null, 
persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, 
targetConsumerId = null, compressed = false, userID = null, content = null, 
marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size 
= 0, properties = null, readOnlyProperties = true, readOnlyBody = true, 
droppable = false, jmsXGroupFirstForConsumer = false, text = Enter some text 
here for the message body...}. 
Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID:
 ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by: 
[java.lang.IllegalStateException - Not the original message from the broker 
Message: Enter some text here for the message body...] | 
org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ 
VMTransport: vm://localhost#1

Is there a way to accomplish what I’m after?



Reply via email to