I am using Camel 2.10.4 and need to use SFTP to fetch files. Since the sftp consumer does not support dynamic properties (connecting to different host/directory/user/pass, etc.) I implemented a custom processor which sets up the SftpComponent, SftpEndpoint and SftpConsumer, in what I believe to be very similar to the "sftp://" component, but instead of parsing the URI for the settings, it gets the settings from the inbound body of type Map<String, Object>.
It all works, except after a certain time period, I need it to stop polling. I tried to suspend the route it's in, but it is still polling - I tried to stop the route - it's still polling! How can I get it to stop?? Though JMX, I can see that the route is definitely stopped, so how can the SftpConsumer be still polling??? Thanks, Chris Here's roughly what my custom Processor looks like (logging, error handling stripped out) public class SftpDownloader implements Processor { protected SftpComponent sftpComponent; protected SftpEndpoint sftpEndpoint; protected SftpConsumer sftpConsumer; protected org.apache.camel.ProducerTemplate producer; protected String toEndpointURI; public void process(Exchange exchange) throws Exception { Map<String, Object> ftpProps = exchange.getIn().getBody(Map.class); CamelContext context = exchange.getContext(); if (this.producer == null) { this.producer = context.createProducerTemplate(); this.producer.setDefaultEndpointUri(this.toEndpointURI); } configure(context, ftpProps, endpointURIQueryString); } void configure(CamelContext context, Map<String, Object> parameters, String queryStr) throws Exception { String initialURI = String.format("sftp://%s/%s", parameters.get("host"), parameters.get("directory")); sftpEndpoint = context.getEndpoint(initialURI, SftpEndpoint.class); sftpComponent = (SftpComponent) sftpEndpoint.getComponent(); SftpConfiguration conf = sftpEndpoint.getConfiguration(); // set reference properties first as they use # syntax that fools the regular properties setter EndpointHelper.setReferenceProperties(context, conf, parameters); EndpointHelper.setProperties(context, conf, parameters); EndpointHelper.setReferenceProperties(context, sftpEndpoint, parameters); EndpointHelper.setProperties(context, sftpEndpoint, parameters); sftpConsumer = (SftpConsumer) sftpEndpoint.createConsumer(new Processor() { @Override public void process(Exchange exchange) throws Exception { producer.send(exchange); } }); ((ScheduledPollConsumer) sftpConsumer).setStartScheduler(true); sftpConsumer.start(); }