Hi gurus,

I have the  following issue. I need to send requests to webservice from camel 
on timer events.
As timer consumer uses synchronous event firing , only one request can be sent 
at the same time. What I’d like is use to threads DSL to process multiple timer 
events at the same time.

Here is an example:

from("timer:start?period=100&delay=100&repeatCount=0")
    .threads(1, 
5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
        .to("bean:wscall");

However, usage of threads DSL with original timer consumer does not make sense 
because of synchronicity of timer consumer, so I tried to develop asynchronous 
timer component that uses asynchronous API to fire events.

Everything works fine except that there is a lot of inflight exchanges when 
stopping the route.

Could you please suggest how to avoid such an issue?

=================================================================================================================================================

Below is the unit test that allows to reproduce this unexpected behavior.


package foo.bar;

import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.junit.Test;

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class CamelRoutingTest extends CamelTestSupport {

    @Test
    public void route() throws Exception {
        Endpoint endpoint = 
context().getRoute("timer").getConsumer().getEndpoint();

        // make timer fire for several times
        TimeUnit.SECONDS.sleep(5);

        context().stopRoute("timer", 10, TimeUnit.SECONDS);

        int inflightExchanges = 
context().getInflightRepository().size(endpoint);
        assertTrue("inflightExchanges: " + inflightExchanges, inflightExchanges 
== 0);
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = super.createRegistry();
        registry.bind("atimer", new AsyncTimerComponent());
        return registry;
    }

    @Override
    protected int getShutdownTimeout() {
        return 1;
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                
from("atimer:start?period=100&delay=100&repeatCount=0").id("timer")
                        .threads(1, 
5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
                                 "START: ${header." + Exchange.BREADCRUMB_ID + 
"}")
                            .delay(1000)
                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
                                 "COMPLETE: ${header." + Exchange.BREADCRUMB_ID 
+ "}")
                            .to("mock:result");
            }
        };
    }

    public static class AsyncTimerComponent extends DefaultComponent {
        private final Map<String, Timer> timers = new HashMap<String, Timer>();
        public Timer getTimer(AsyncTimerEndpoint endpoint) {
            String key = endpoint.getTimerName();
            if (!endpoint.isDaemon()) {
                key = "nonDaemon:" + key;
            }

            Timer answer;
            synchronized (timers) {
                answer = timers.get(key);
                if (answer == null) {
                    answer = new Timer(endpoint.getTimerName(), 
endpoint.isDaemon());
                    timers.put(key, answer);
                }
            }
            return answer;
        }

        @Override
        protected AsyncTimerEndpoint createEndpoint(String uri, String 
remaining, Map<String, Object> parameters) throws Exception {
            AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this, 
remaining);
            setProperties(answer, parameters);
            return answer;
        }

        @Override
        protected void doStop() throws Exception {
            Collection<Timer> collection = timers.values();
            for (Timer timer : collection) {
                timer.cancel();
            }
            timers.clear();
        }
    }

    public static class AsyncTimerEndpoint extends DefaultEndpoint implements 
Service {
        private String timerName;
        private long period = 1000;
        private long delay;
        private boolean fixedRate;
        private boolean daemon = true;
        private Timer timer;
        private long repeatCount;

        public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent 
component, String timerName) {
            super(fullURI, component);
            this.timerName = timerName;
        }
        public Producer createProducer() throws Exception {
            throw new RuntimeCamelException("There is no producer");
        }

        public AsyncTimerConsumer createConsumer(Processor processor) throws 
Exception {
            return new AsyncTimerConsumer(this, processor);
        }

        public void start() throws Exception {
            // noop
        }

        public void stop() throws Exception {
            setTimer(null);
        }

        public String getTimerName() {
            if (timerName == null) {
                timerName = getEndpointUri();
            }
            return timerName;
        }

        public void setTimerName(String timerName) {
            this.timerName = timerName;
        }

        public boolean isDaemon() {
            return daemon;
        }

        public void setDaemon(boolean daemon) {
            this.daemon = daemon;
        }

        public long getDelay() {
            return delay;
        }

        public void setDelay(long delay) {
            this.delay = delay;
        }

        public boolean isFixedRate() {
            return fixedRate;
        }

        public void setFixedRate(boolean fixedRate) {
            this.fixedRate = fixedRate;
        }

        public long getPeriod() {
            return period;
        }

        public void setPeriod(long period) {
            this.period = period;
        }

        public long getRepeatCount() {
            return repeatCount;
        }

        public void setRepeatCount(long repeatCount) {
            this.repeatCount = repeatCount;
        }

        public boolean isSingleton() {
            return true;
        }

        public synchronized Timer getTimer() {
            if (timer == null) {
                AsyncTimerComponent tc = (AsyncTimerComponent) getComponent();
                timer = tc.getTimer(this);
            }
            return timer;
        }

        public synchronized void setTimer(Timer timer) {
            this.timer = timer;
        }

        public String getEndpointUri() {
            return super.getEndpointUri();
        }
    }

    public static class AsyncTimerConsumer extends DefaultConsumer {

        private final AsyncTimerEndpoint endpoint;
        private volatile TimerTask task;

        public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor 
processor) {
            super(endpoint, processor);
            this.endpoint = endpoint;
        }

        @Override
        protected void doStart() throws Exception {
            task = new TimerTask() {
                // counter
                private final AtomicLong counter = new AtomicLong();

                @Override
                public void run() {
                    try {
                        long count = counter.incrementAndGet();

                        boolean fire = endpoint.getRepeatCount() <= 0 || count 
<= endpoint.getRepeatCount();
                        if (fire) {
                            sendTimerExchange(count);
                        } else {
                            // no need to fire anymore as we exceeded repeat 
count
                            log.debug("Cancelling {} timer as repeat count 
limit reached after {} counts.", endpoint.getTimerName(), 
endpoint.getRepeatCount());
                            cancel();
                        }
                    } catch (Throwable e) {
                        // catch all to avoid the JVM closing the thread and 
not firing again
                        log.warn("Error processing exchange. This exception 
will be ignored, to let the timer be able to trigger again.", e);
                    }
                }
            };

            Timer timer = endpoint.getTimer();
            configureTask(task, timer);
        }

        @Override
        protected void doStop() throws Exception {
            if (task != null) {
                task.cancel();
            }
            task = null;
        }

        protected void configureTask(TimerTask task, Timer timer) {
            if (endpoint.isFixedRate()) {
                timer.scheduleAtFixedRate(task, endpoint.getDelay(), 
endpoint.getPeriod());
            } else {
                if (endpoint.getPeriod() > 0) {
                    timer.schedule(task, endpoint.getDelay(), 
endpoint.getPeriod());
                } else {
                    timer.schedule(task, endpoint.getDelay());
                }
            }
        }

        protected void sendTimerExchange(long counter) {
            Exchange exchange = endpoint.createExchange();
            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
            exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date());

            log.trace("Timer {} is firing #{} count", endpoint.getTimerName(), 
counter);
            try {
                AsyncProcessorHelper.process(getAsyncProcessor(), exchange, new 
AsyncCallback() {
                    public void done(boolean doneSync) {
                        // noop
                    }
                });

                // log exception if an exception occurred and was not handled
                if (exchange.getException() != null) {
                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
                }
            } catch (Exception e) {
                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
            }
        }

    }

Best Regards,
Sergey Zhemzhitsky

_______________________________________________________

The information contained in this message may be privileged and conf idential 
and protected from disclosure. If you are not the original intended recipient, 
you are hereby notified that any review, retransmission, dissemination, or 
other use of, or taking of any action in reliance upon, this information is 
prohibited. If you have received this communication in error, please notify the 
sender immediately by replying to this message and delete it from your 
computer. Thank you for your cooperation. Troika Dialog, Russia. 
If you need assistance please contact our Contact Center  (+7495) 258 0500 or 
go to www.troika.ru/eng/Contacts/system.wbp  

Reply via email to