Hi Well spotted. The ticket is created https://issues.apache.org/jira/browse/CAMEL-4925
2012/1/20 Zhemzhitsky Sergey <[email protected]>: > Hi gurus, > > I have the following issue. I need to send requests to webservice from camel > on timer events. > As timer consumer uses synchronous event firing , only one request can be > sent at the same time. What I’d like is use to threads DSL to process > multiple timer events at the same time. > > Here is an example: > > from("timer:start?period=100&delay=100&repeatCount=0") > .threads(1, > 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest) > .to("bean:wscall"); > > However, usage of threads DSL with original timer consumer does not make > sense because of synchronicity of timer consumer, so I tried to develop > asynchronous timer component that uses asynchronous API to fire events. > > Everything works fine except that there is a lot of inflight exchanges when > stopping the route. > > Could you please suggest how to avoid such an issue? > > ================================================================================================================================================= > > Below is the unit test that allows to reproduce this unexpected behavior. > > > package foo.bar; > > import org.apache.camel.AsyncCallback; > import org.apache.camel.Endpoint; > import org.apache.camel.Exchange; > import org.apache.camel.LoggingLevel; > import org.apache.camel.Processor; > import org.apache.camel.Producer; > import org.apache.camel.RuntimeCamelException; > import org.apache.camel.Service; > import org.apache.camel.ThreadPoolRejectedPolicy; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.impl.DefaultComponent; > import org.apache.camel.impl.DefaultConsumer; > import org.apache.camel.impl.DefaultEndpoint; > import org.apache.camel.impl.JndiRegistry; > import org.apache.camel.test.junit4.CamelTestSupport; > import org.apache.camel.util.AsyncProcessorHelper; > import org.junit.Test; > > import java.util.Collection; > import java.util.Date; > import java.util.HashMap; > import java.util.Map; > import java.util.Timer; > import java.util.TimerTask; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicLong; > > public class CamelRoutingTest extends CamelTestSupport { > > @Test > public void route() throws Exception { > Endpoint endpoint = > context().getRoute("timer").getConsumer().getEndpoint(); > > // make timer fire for several times > TimeUnit.SECONDS.sleep(5); > > context().stopRoute("timer", 10, TimeUnit.SECONDS); > > int inflightExchanges = > context().getInflightRepository().size(endpoint); > assertTrue("inflightExchanges: " + inflightExchanges, > inflightExchanges == 0); > } > > @Override > protected JndiRegistry createRegistry() throws Exception { > JndiRegistry registry = super.createRegistry(); > registry.bind("atimer", new AsyncTimerComponent()); > return registry; > } > > @Override > protected int getShutdownTimeout() { > return 1; > } > > @Override > protected RouteBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > > from("atimer:start?period=100&delay=100&repeatCount=0").id("timer") > .threads(1, > 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest) > .log(LoggingLevel.INFO, "org.apache.camel.LOGGER", > "START: ${header." + Exchange.BREADCRUMB_ID + > "}") > .delay(1000) > .log(LoggingLevel.INFO, "org.apache.camel.LOGGER", > "COMPLETE: ${header." + > Exchange.BREADCRUMB_ID + "}") > .to("mock:result"); > } > }; > } > > public static class AsyncTimerComponent extends DefaultComponent { > private final Map<String, Timer> timers = new HashMap<String, Timer>(); > public Timer getTimer(AsyncTimerEndpoint endpoint) { > String key = endpoint.getTimerName(); > if (!endpoint.isDaemon()) { > key = "nonDaemon:" + key; > } > > Timer answer; > synchronized (timers) { > answer = timers.get(key); > if (answer == null) { > answer = new Timer(endpoint.getTimerName(), > endpoint.isDaemon()); > timers.put(key, answer); > } > } > return answer; > } > > @Override > protected AsyncTimerEndpoint createEndpoint(String uri, String > remaining, Map<String, Object> parameters) throws Exception { > AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this, > remaining); > setProperties(answer, parameters); > return answer; > } > > @Override > protected void doStop() throws Exception { > Collection<Timer> collection = timers.values(); > for (Timer timer : collection) { > timer.cancel(); > } > timers.clear(); > } > } > > public static class AsyncTimerEndpoint extends DefaultEndpoint implements > Service { > private String timerName; > private long period = 1000; > private long delay; > private boolean fixedRate; > private boolean daemon = true; > private Timer timer; > private long repeatCount; > > public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent > component, String timerName) { > super(fullURI, component); > this.timerName = timerName; > } > public Producer createProducer() throws Exception { > throw new RuntimeCamelException("There is no producer"); > } > > public AsyncTimerConsumer createConsumer(Processor processor) throws > Exception { > return new AsyncTimerConsumer(this, processor); > } > > public void start() throws Exception { > // noop > } > > public void stop() throws Exception { > setTimer(null); > } > > public String getTimerName() { > if (timerName == null) { > timerName = getEndpointUri(); > } > return timerName; > } > > public void setTimerName(String timerName) { > this.timerName = timerName; > } > > public boolean isDaemon() { > return daemon; > } > > public void setDaemon(boolean daemon) { > this.daemon = daemon; > } > > public long getDelay() { > return delay; > } > > public void setDelay(long delay) { > this.delay = delay; > } > > public boolean isFixedRate() { > return fixedRate; > } > > public void setFixedRate(boolean fixedRate) { > this.fixedRate = fixedRate; > } > > public long getPeriod() { > return period; > } > > public void setPeriod(long period) { > this.period = period; > } > > public long getRepeatCount() { > return repeatCount; > } > > public void setRepeatCount(long repeatCount) { > this.repeatCount = repeatCount; > } > > public boolean isSingleton() { > return true; > } > > public synchronized Timer getTimer() { > if (timer == null) { > AsyncTimerComponent tc = (AsyncTimerComponent) getComponent(); > timer = tc.getTimer(this); > } > return timer; > } > > public synchronized void setTimer(Timer timer) { > this.timer = timer; > } > > public String getEndpointUri() { > return super.getEndpointUri(); > } > } > > public static class AsyncTimerConsumer extends DefaultConsumer { > > private final AsyncTimerEndpoint endpoint; > private volatile TimerTask task; > > public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor > processor) { > super(endpoint, processor); > this.endpoint = endpoint; > } > > @Override > protected void doStart() throws Exception { > task = new TimerTask() { > // counter > private final AtomicLong counter = new AtomicLong(); > > @Override > public void run() { > try { > long count = counter.incrementAndGet(); > > boolean fire = endpoint.getRepeatCount() <= 0 || count > <= endpoint.getRepeatCount(); > if (fire) { > sendTimerExchange(count); > } else { > // no need to fire anymore as we exceeded repeat > count > log.debug("Cancelling {} timer as repeat count > limit reached after {} counts.", endpoint.getTimerName(), > endpoint.getRepeatCount()); > cancel(); > } > } catch (Throwable e) { > // catch all to avoid the JVM closing the thread and > not firing again > log.warn("Error processing exchange. This exception > will be ignored, to let the timer be able to trigger again.", e); > } > } > }; > > Timer timer = endpoint.getTimer(); > configureTask(task, timer); > } > > @Override > protected void doStop() throws Exception { > if (task != null) { > task.cancel(); > } > task = null; > } > > protected void configureTask(TimerTask task, Timer timer) { > if (endpoint.isFixedRate()) { > timer.scheduleAtFixedRate(task, endpoint.getDelay(), > endpoint.getPeriod()); > } else { > if (endpoint.getPeriod() > 0) { > timer.schedule(task, endpoint.getDelay(), > endpoint.getPeriod()); > } else { > timer.schedule(task, endpoint.getDelay()); > } > } > } > > protected void sendTimerExchange(long counter) { > Exchange exchange = endpoint.createExchange(); > exchange.setProperty(Exchange.TIMER_COUNTER, counter); > exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName()); > exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod()); > exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date()); > > log.trace("Timer {} is firing #{} count", endpoint.getTimerName(), > counter); > try { > AsyncProcessorHelper.process(getAsyncProcessor(), exchange, > new AsyncCallback() { > public void done(boolean doneSync) { > // noop > } > }); > > // log exception if an exception occurred and was not handled > if (exchange.getException() != null) { > getExceptionHandler().handleException("Error processing > exchange", exchange, exchange.getException()); > } > } catch (Exception e) { > getExceptionHandler().handleException("Error processing > exchange", exchange, exchange.getException()); > } > } > > } > > Best Regards, > Sergey Zhemzhitsky > > _______________________________________________________ > > The information contained in this message may be privileged and conf idential > and protected from disclosure. If you are not the original intended > recipient, you are hereby notified that any review, retransmission, > dissemination, or other use of, or taking of any action in reliance upon, > this information is prohibited. If you have received this communication in > error, please notify the sender immediately by replying to this message and > delete it from your computer. Thank you for your cooperation. Troika Dialog, > Russia. > If you need assistance please contact our Contact Center (+7495) 258 0500 or > go to www.troika.ru/eng/Contacts/system.wbp > -- Claus Ibsen ----------------- FuseSource Email: [email protected] Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/
