Hi

Well spotted. The ticket is created
https://issues.apache.org/jira/browse/CAMEL-4925

2012/1/20 Zhemzhitsky Sergey <[email protected]>:
> Hi gurus,
>
> I have the  following issue. I need to send requests to webservice from camel 
> on timer events.
> As timer consumer uses synchronous event firing , only one request can be 
> sent at the same time. What I’d like is use to threads DSL to process 
> multiple timer events at the same time.
>
> Here is an example:
>
> from("timer:start?period=100&delay=100&repeatCount=0")
>    .threads(1, 
> 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
>        .to("bean:wscall");
>
> However, usage of threads DSL with original timer consumer does not make 
> sense because of synchronicity of timer consumer, so I tried to develop 
> asynchronous timer component that uses asynchronous API to fire events.
>
> Everything works fine except that there is a lot of inflight exchanges when 
> stopping the route.
>
> Could you please suggest how to avoid such an issue?
>
> =================================================================================================================================================
>
> Below is the unit test that allows to reproduce this unexpected behavior.
>
>
> package foo.bar;
>
> import org.apache.camel.AsyncCallback;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.LoggingLevel;
> import org.apache.camel.Processor;
> import org.apache.camel.Producer;
> import org.apache.camel.RuntimeCamelException;
> import org.apache.camel.Service;
> import org.apache.camel.ThreadPoolRejectedPolicy;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultComponent;
> import org.apache.camel.impl.DefaultConsumer;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.apache.camel.util.AsyncProcessorHelper;
> import org.junit.Test;
>
> import java.util.Collection;
> import java.util.Date;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Timer;
> import java.util.TimerTask;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicLong;
>
> public class CamelRoutingTest extends CamelTestSupport {
>
>    @Test
>    public void route() throws Exception {
>        Endpoint endpoint = 
> context().getRoute("timer").getConsumer().getEndpoint();
>
>        // make timer fire for several times
>        TimeUnit.SECONDS.sleep(5);
>
>        context().stopRoute("timer", 10, TimeUnit.SECONDS);
>
>        int inflightExchanges = 
> context().getInflightRepository().size(endpoint);
>        assertTrue("inflightExchanges: " + inflightExchanges, 
> inflightExchanges == 0);
>    }
>
>    @Override
>    protected JndiRegistry createRegistry() throws Exception {
>        JndiRegistry registry = super.createRegistry();
>        registry.bind("atimer", new AsyncTimerComponent());
>        return registry;
>    }
>
>    @Override
>    protected int getShutdownTimeout() {
>        return 1;
>    }
>
>    @Override
>    protected RouteBuilder createRouteBuilder() throws Exception {
>        return new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>                
> from("atimer:start?period=100&delay=100&repeatCount=0").id("timer")
>                        .threads(1, 
> 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
>                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
>                                 "START: ${header." + Exchange.BREADCRUMB_ID + 
> "}")
>                            .delay(1000)
>                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
>                                 "COMPLETE: ${header." + 
> Exchange.BREADCRUMB_ID + "}")
>                            .to("mock:result");
>            }
>        };
>    }
>
>    public static class AsyncTimerComponent extends DefaultComponent {
>        private final Map<String, Timer> timers = new HashMap<String, Timer>();
>        public Timer getTimer(AsyncTimerEndpoint endpoint) {
>            String key = endpoint.getTimerName();
>            if (!endpoint.isDaemon()) {
>                key = "nonDaemon:" + key;
>            }
>
>            Timer answer;
>            synchronized (timers) {
>                answer = timers.get(key);
>                if (answer == null) {
>                    answer = new Timer(endpoint.getTimerName(), 
> endpoint.isDaemon());
>                    timers.put(key, answer);
>                }
>            }
>            return answer;
>        }
>
>        @Override
>        protected AsyncTimerEndpoint createEndpoint(String uri, String 
> remaining, Map<String, Object> parameters) throws Exception {
>            AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this, 
> remaining);
>            setProperties(answer, parameters);
>            return answer;
>        }
>
>        @Override
>        protected void doStop() throws Exception {
>            Collection<Timer> collection = timers.values();
>            for (Timer timer : collection) {
>                timer.cancel();
>            }
>            timers.clear();
>        }
>    }
>
>    public static class AsyncTimerEndpoint extends DefaultEndpoint implements 
> Service {
>        private String timerName;
>        private long period = 1000;
>        private long delay;
>        private boolean fixedRate;
>        private boolean daemon = true;
>        private Timer timer;
>        private long repeatCount;
>
>        public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent 
> component, String timerName) {
>            super(fullURI, component);
>            this.timerName = timerName;
>        }
>        public Producer createProducer() throws Exception {
>            throw new RuntimeCamelException("There is no producer");
>        }
>
>        public AsyncTimerConsumer createConsumer(Processor processor) throws 
> Exception {
>            return new AsyncTimerConsumer(this, processor);
>        }
>
>        public void start() throws Exception {
>            // noop
>        }
>
>        public void stop() throws Exception {
>            setTimer(null);
>        }
>
>        public String getTimerName() {
>            if (timerName == null) {
>                timerName = getEndpointUri();
>            }
>            return timerName;
>        }
>
>        public void setTimerName(String timerName) {
>            this.timerName = timerName;
>        }
>
>        public boolean isDaemon() {
>            return daemon;
>        }
>
>        public void setDaemon(boolean daemon) {
>            this.daemon = daemon;
>        }
>
>        public long getDelay() {
>            return delay;
>        }
>
>        public void setDelay(long delay) {
>            this.delay = delay;
>        }
>
>        public boolean isFixedRate() {
>            return fixedRate;
>        }
>
>        public void setFixedRate(boolean fixedRate) {
>            this.fixedRate = fixedRate;
>        }
>
>        public long getPeriod() {
>            return period;
>        }
>
>        public void setPeriod(long period) {
>            this.period = period;
>        }
>
>        public long getRepeatCount() {
>            return repeatCount;
>        }
>
>        public void setRepeatCount(long repeatCount) {
>            this.repeatCount = repeatCount;
>        }
>
>        public boolean isSingleton() {
>            return true;
>        }
>
>        public synchronized Timer getTimer() {
>            if (timer == null) {
>                AsyncTimerComponent tc = (AsyncTimerComponent) getComponent();
>                timer = tc.getTimer(this);
>            }
>            return timer;
>        }
>
>        public synchronized void setTimer(Timer timer) {
>            this.timer = timer;
>        }
>
>        public String getEndpointUri() {
>            return super.getEndpointUri();
>        }
>    }
>
>    public static class AsyncTimerConsumer extends DefaultConsumer {
>
>        private final AsyncTimerEndpoint endpoint;
>        private volatile TimerTask task;
>
>        public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor 
> processor) {
>            super(endpoint, processor);
>            this.endpoint = endpoint;
>        }
>
>        @Override
>        protected void doStart() throws Exception {
>            task = new TimerTask() {
>                // counter
>                private final AtomicLong counter = new AtomicLong();
>
>                @Override
>                public void run() {
>                    try {
>                        long count = counter.incrementAndGet();
>
>                        boolean fire = endpoint.getRepeatCount() <= 0 || count 
> <= endpoint.getRepeatCount();
>                        if (fire) {
>                            sendTimerExchange(count);
>                        } else {
>                            // no need to fire anymore as we exceeded repeat 
> count
>                            log.debug("Cancelling {} timer as repeat count 
> limit reached after {} counts.", endpoint.getTimerName(), 
> endpoint.getRepeatCount());
>                            cancel();
>                        }
>                    } catch (Throwable e) {
>                        // catch all to avoid the JVM closing the thread and 
> not firing again
>                        log.warn("Error processing exchange. This exception 
> will be ignored, to let the timer be able to trigger again.", e);
>                    }
>                }
>            };
>
>            Timer timer = endpoint.getTimer();
>            configureTask(task, timer);
>        }
>
>        @Override
>        protected void doStop() throws Exception {
>            if (task != null) {
>                task.cancel();
>            }
>            task = null;
>        }
>
>        protected void configureTask(TimerTask task, Timer timer) {
>            if (endpoint.isFixedRate()) {
>                timer.scheduleAtFixedRate(task, endpoint.getDelay(), 
> endpoint.getPeriod());
>            } else {
>                if (endpoint.getPeriod() > 0) {
>                    timer.schedule(task, endpoint.getDelay(), 
> endpoint.getPeriod());
>                } else {
>                    timer.schedule(task, endpoint.getDelay());
>                }
>            }
>        }
>
>        protected void sendTimerExchange(long counter) {
>            Exchange exchange = endpoint.createExchange();
>            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
>            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
>            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
>            exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date());
>
>            log.trace("Timer {} is firing #{} count", endpoint.getTimerName(), 
> counter);
>            try {
>                AsyncProcessorHelper.process(getAsyncProcessor(), exchange, 
> new AsyncCallback() {
>                    public void done(boolean doneSync) {
>                        // noop
>                    }
>                });
>
>                // log exception if an exception occurred and was not handled
>                if (exchange.getException() != null) {
>                    getExceptionHandler().handleException("Error processing 
> exchange", exchange, exchange.getException());
>                }
>            } catch (Exception e) {
>                getExceptionHandler().handleException("Error processing 
> exchange", exchange, exchange.getException());
>            }
>        }
>
>    }
>
> Best Regards,
> Sergey Zhemzhitsky
>
> _______________________________________________________
>
> The information contained in this message may be privileged and conf idential 
> and protected from disclosure. If you are not the original intended 
> recipient, you are hereby notified that any review, retransmission, 
> dissemination, or other use of, or taking of any action in reliance upon, 
> this information is prohibited. If you have received this communication in 
> error, please notify the sender immediately by replying to this message and 
> delete it from your computer. Thank you for your cooperation. Troika Dialog, 
> Russia.
> If you need assistance please contact our Contact Center  (+7495) 258 0500 or 
> go to www.troika.ru/eng/Contacts/system.wbp
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: [email protected]
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to