I’m trying to use the ActiveMQ Broker Camel Component to add some JMS user properties to messages as they arrive at the broker. I’m using a wildcard on the from so I can apply the same logic to a set of topics or queues, but I can’t seem to send the message back to the original queue.
Without wildcards, it works fine - something like this: <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <camelContext id="audit-enrichment" xmlns="http://camel.apache.org/schema/spring"> <route id="in-audit-to-file"> <from uri="broker://queue:in.adt.epic"/> <setHeader headerName="MyCustomHeader"> <constant>MyHeaderValue</constant> </setHeader> <to uri="broker://queue:in.adt.epic" /> </route> </camelContext> </beans> However when I put in the wildcards, I get and IllegalStateException. The configuration I’m trying looks like this <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <camelContext id="audit-enrichment" xmlns="http://camel.apache.org/schema/spring"> <route id="in-audit-to-file"> <from uri="broker://queue:in.adt.*"/> <setHeader headerName="MyCustomHeader"> <constant>MyHeaderValue</constant> </setHeader> <recipientList> <simple>broker://${header[JMSDestination]}</simple> </recipientList> </route> </camelContext> </beans> And here’s the exception I get 2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId: ID-macpro-local-53588-1461869253207-0-2 on ExchangeId: ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Not the original message from the broker Message: Enter some text here for the message body... Message History --------------------------------------------------------------------------------------------------------------------------------------- RouteId ProcessorId Processor Elapsed (ms) [in-audit-to-file ] [in-audit-to-file ] [broker://queue:in.adt.* ] [ 23] [in-audit-to-file ] [setHeader1 ] [setHeader[MyCustomHeader] ] [ 3] [in-audit-to-file ] [recipientList1 ] [recipientList[simple{broker://${header[JMSDestination]}}] ] [ 18] Exchange --------------------------------------------------------------------------------------------------------------------------------------- Exchange[ Id ID-macpro-local-53588-1461869253207-0-3 ExchangePattern InOnly Headers {breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1, CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=, JMSCorrelationIDAsBytes=, JMSDeliveryMode=1, JMSDestination=queue://in.adt.epic, JMSExpiration=0, JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684, JMSType=, JMSXGroupID=null, JMSXUserID=null, MyCustomHeader=MyHeaderValue} BodyType String Body Enter some text here for the message body... ] Stacktrace --------------------------------------------------------------------------------------------------------------------------------------- | org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport: vm://localhost#1 java.lang.IllegalStateException: Not the original message from the broker Message: Enter some text here for the message body... at org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2] at org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2] at org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2] at org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2] at org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2] at org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2] at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2] at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2] at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2] at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2] at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2] at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2] at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2] at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2] at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2] at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2] 2016-04-28 12:47:43,727 | WARN | Error processing intercepted message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:macpro.local-53587-1461869252118-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:macpro.local-53587-1461869252118-4:1:1:1, destination = queue://in.adt.epic, transactionId = null, expiration = 0, timestamp = 1461869263684, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Enter some text here for the message body...}. Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID: ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by: [java.lang.IllegalStateException - Not the original message from the broker Message: Enter some text here for the message body...] | org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ VMTransport: vm://localhost#1 Is there a way to accomplish what I’m after?