Hi gurus,
I have the following issue. I need to send requests to webservice from camel
on timer events.
As timer consumer uses synchronous event firing , only one request can be sent
at the same time. What I’d like is use to threads DSL to process multiple timer
events at the same time.
Here is an example:
from("timer:start?period=100&delay=100&repeatCount=0")
.threads(1,
5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
.to("bean:wscall");
However, usage of threads DSL with original timer consumer does not make sense
because of synchronicity of timer consumer, so I tried to develop asynchronous
timer component that uses asynchronous API to fire events.
Everything works fine except that there is a lot of inflight exchanges when
stopping the route.
Could you please suggest how to avoid such an issue?
=================================================================================================================================================
Below is the unit test that allows to reproduce this unexpected behavior.
package foo.bar;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.junit.Test;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class CamelRoutingTest extends CamelTestSupport {
@Test
public void route() throws Exception {
Endpoint endpoint =
context().getRoute("timer").getConsumer().getEndpoint();
// make timer fire for several times
TimeUnit.SECONDS.sleep(5);
context().stopRoute("timer", 10, TimeUnit.SECONDS);
int inflightExchanges =
context().getInflightRepository().size(endpoint);
assertTrue("inflightExchanges: " + inflightExchanges, inflightExchanges
== 0);
}
@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
registry.bind("atimer", new AsyncTimerComponent());
return registry;
}
@Override
protected int getShutdownTimeout() {
return 1;
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("atimer:start?period=100&delay=100&repeatCount=0").id("timer")
.threads(1,
5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
.log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
"START: ${header." + Exchange.BREADCRUMB_ID +
"}")
.delay(1000)
.log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
"COMPLETE: ${header." + Exchange.BREADCRUMB_ID
+ "}")
.to("mock:result");
}
};
}
public static class AsyncTimerComponent extends DefaultComponent {
private final Map<String, Timer> timers = new HashMap<String, Timer>();
public Timer getTimer(AsyncTimerEndpoint endpoint) {
String key = endpoint.getTimerName();
if (!endpoint.isDaemon()) {
key = "nonDaemon:" + key;
}
Timer answer;
synchronized (timers) {
answer = timers.get(key);
if (answer == null) {
answer = new Timer(endpoint.getTimerName(),
endpoint.isDaemon());
timers.put(key, answer);
}
}
return answer;
}
@Override
protected AsyncTimerEndpoint createEndpoint(String uri, String
remaining, Map<String, Object> parameters) throws Exception {
AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this,
remaining);
setProperties(answer, parameters);
return answer;
}
@Override
protected void doStop() throws Exception {
Collection<Timer> collection = timers.values();
for (Timer timer : collection) {
timer.cancel();
}
timers.clear();
}
}
public static class AsyncTimerEndpoint extends DefaultEndpoint implements
Service {
private String timerName;
private long period = 1000;
private long delay;
private boolean fixedRate;
private boolean daemon = true;
private Timer timer;
private long repeatCount;
public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent
component, String timerName) {
super(fullURI, component);
this.timerName = timerName;
}
public Producer createProducer() throws Exception {
throw new RuntimeCamelException("There is no producer");
}
public AsyncTimerConsumer createConsumer(Processor processor) throws
Exception {
return new AsyncTimerConsumer(this, processor);
}
public void start() throws Exception {
// noop
}
public void stop() throws Exception {
setTimer(null);
}
public String getTimerName() {
if (timerName == null) {
timerName = getEndpointUri();
}
return timerName;
}
public void setTimerName(String timerName) {
this.timerName = timerName;
}
public boolean isDaemon() {
return daemon;
}
public void setDaemon(boolean daemon) {
this.daemon = daemon;
}
public long getDelay() {
return delay;
}
public void setDelay(long delay) {
this.delay = delay;
}
public boolean isFixedRate() {
return fixedRate;
}
public void setFixedRate(boolean fixedRate) {
this.fixedRate = fixedRate;
}
public long getPeriod() {
return period;
}
public void setPeriod(long period) {
this.period = period;
}
public long getRepeatCount() {
return repeatCount;
}
public void setRepeatCount(long repeatCount) {
this.repeatCount = repeatCount;
}
public boolean isSingleton() {
return true;
}
public synchronized Timer getTimer() {
if (timer == null) {
AsyncTimerComponent tc = (AsyncTimerComponent) getComponent();
timer = tc.getTimer(this);
}
return timer;
}
public synchronized void setTimer(Timer timer) {
this.timer = timer;
}
public String getEndpointUri() {
return super.getEndpointUri();
}
}
public static class AsyncTimerConsumer extends DefaultConsumer {
private final AsyncTimerEndpoint endpoint;
private volatile TimerTask task;
public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor
processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@Override
protected void doStart() throws Exception {
task = new TimerTask() {
// counter
private final AtomicLong counter = new AtomicLong();
@Override
public void run() {
try {
long count = counter.incrementAndGet();
boolean fire = endpoint.getRepeatCount() <= 0 || count
<= endpoint.getRepeatCount();
if (fire) {
sendTimerExchange(count);
} else {
// no need to fire anymore as we exceeded repeat
count
log.debug("Cancelling {} timer as repeat count
limit reached after {} counts.", endpoint.getTimerName(),
endpoint.getRepeatCount());
cancel();
}
} catch (Throwable e) {
// catch all to avoid the JVM closing the thread and
not firing again
log.warn("Error processing exchange. This exception
will be ignored, to let the timer be able to trigger again.", e);
}
}
};
Timer timer = endpoint.getTimer();
configureTask(task, timer);
}
@Override
protected void doStop() throws Exception {
if (task != null) {
task.cancel();
}
task = null;
}
protected void configureTask(TimerTask task, Timer timer) {
if (endpoint.isFixedRate()) {
timer.scheduleAtFixedRate(task, endpoint.getDelay(),
endpoint.getPeriod());
} else {
if (endpoint.getPeriod() > 0) {
timer.schedule(task, endpoint.getDelay(),
endpoint.getPeriod());
} else {
timer.schedule(task, endpoint.getDelay());
}
}
}
protected void sendTimerExchange(long counter) {
Exchange exchange = endpoint.createExchange();
exchange.setProperty(Exchange.TIMER_COUNTER, counter);
exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date());
log.trace("Timer {} is firing #{} count", endpoint.getTimerName(),
counter);
try {
AsyncProcessorHelper.process(getAsyncProcessor(), exchange, new
AsyncCallback() {
public void done(boolean doneSync) {
// noop
}
});
// log exception if an exception occurred and was not handled
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
}
} catch (Exception e) {
getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
}
}
}
Best Regards,
Sergey Zhemzhitsky
_______________________________________________________
The information contained in this message may be privileged and conf idential
and protected from disclosure. If you are not the original intended recipient,
you are hereby notified that any review, retransmission, dissemination, or
other use of, or taking of any action in reliance upon, this information is
prohibited. If you have received this communication in error, please notify the
sender immediately by replying to this message and delete it from your
computer. Thank you for your cooperation. Troika Dialog, Russia.
If you need assistance please contact our Contact Center (+7495) 258 0500 or
go to www.troika.ru/eng/Contacts/system.wbp