Ok so I want to route a message from a ActiveMQ queue through mina:tcp and
into an other activeMQ queue.
Using this code on one side (A):
RouteBuilder builder = new RouteBuilder() {
public void configure() {
from("mina:tcp://localhost:5500?textline=true").to("activemq:queue:q4");
from("activemq:queue:q1").to("mina:tcp://localhost:5501?textline=true");
}
};
CamelContext myCamelContext = new DefaultCamelContext();
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setBrokerURL("tcp://localhost:61616?jms.useAsyncSend=true");
myCamelContext.addComponent("activemq", activeMQComponent);
myCamelContext.addRoutes(builder);
myCamelContext.start();
And the other side (B)
RouteBuilder builder = new RouteBuilder() {
public void configure() {
from("mina:tcp://localhost:5501?textline=true").to("activemq:queue:q2");
from("activemq:queue:q3").to("mina:tcp://localhost:5500?textline=true");
}
};
CamelContext myCamelContext = new DefaultCamelContext();
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setBrokerURL("tcp://localhost:61616?jms.useAsyncSend=true");
myCamelContext.addComponent("activemq", activeMQComponent);
myCamelContext.addRoutes(builder);
myCamelContext.start();
Using this code I get the following error on side (B)
org.apache.camel.ExchangeTimedOutException: The OUT message was not received
within: 20000 millis on the exchange:
at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:233)
at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:43)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:83)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:195)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:130)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.proceed(StreamCachingInterceptor.java:88)
at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:83)
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:52)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:65)
at
org.apache.camel.component.mina.MinaConsumer$ReceiveHandler.messageReceived(MinaConsumer.java:110)
at
org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:570)
at
org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:299)
at
org.apache.mina.common.support.AbstractIoFilterChain.access$1100(AbstractIoFilterChain.java:53)
at
org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:648)
at
org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput.flush(SimpleProtocolDecoderOutput.java:58)
at
org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:180)
at
org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:299)
at
org.apache.mina.common.support.AbstractIoFilterChain.access$1100(AbstractIoFilterChain.java:53)
at
org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:648)
at
org.apache.mina.filter.executor.ExecutorFilter.processEvent(ExecutorFilter.java:220)
at
org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run(ExecutorFilter.java:264)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at
org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:51)
at java.lang.Thread.run(Thread.java:619)
And on side (A)
org.apache.camel.CamelExchangeException: Response Handler had an exception
on the exchange:
at
org.apache.camel.component.mina.MinaProducer.process(MinaProducer.java:112)
at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:43)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:83)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:195)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:130)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.proceed(StreamCachingInterceptor.java:88)
at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:83)
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:52)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:65)
at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:72)
at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:531)
at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:466)
at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:435)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:322)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:260)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:944)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:875)
at java.lang.Thread.run(Thread.java:619)
Ok and this is because that mina expects a reply.. so I have tried using a
process:
from("mina:tcp://localhost:5501?textline=true").process(new Processor() {
public void process(Exchange e) {
System.out.println("Camel was here: " +
e.getIn().getBody());
e.getOut().setBody("tcp reply");
}
Which takes care of the problems.. But My routed message is stuck in the
process... adding
from("mina:tcp://localhost:5501?textline=true").process(new Processor() {
public void process(Exchange e) {
System.out.println("Camel was here: " +
e.getIn().getBody());
e.getOut().setBody("tcp reply");
}
}).to("activemq:queue:q2");
after the process gives an error...
What should I do?
--
View this message in context:
http://www.nabble.com/Sending-a-message-between-two-ActiveMQ-using-mina-tp23780161p23780161.html
Sent from the Camel - Users mailing list archive at Nabble.com.