First a little background on my requirements:

  *   Download large tar.gz files from multiple dynamically defined read only 
ftp/sftp sites.
  *   Process the files within the .tar.gz based on extension of the entry name.
  *   Using Camel 2.19.3

My solution is to define new routes with download=false to only obtain a list 
of unprocessed files. A sample route is:

from("ftp://user@localhost/path?download=false&inProgressRepository=#inProgressRepo&idempotentRepository=#idemRepo&noop=true&readLock=changed&readLockMarkerFile=false&autoCreate=false&stepwise=false";).to("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")


Send the file names to a seda queue that downloads the files with 
streamDownload and sends the RemoteFile to a processing route defined as:

from("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
.process({
    String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, 
String.class);
    CamelContext context = exchange.getContext();
    ConsumerTemplate downloadConsumer = context.createConsumerTemplate();
    Producer unpackProducer = 
context.getRoute("unpack").getEndpoint().createProducer();
    Map<String,Object> parms = new HashMap<>();
    parms.put("fileName", fileName);
    parms.put("runLoggingLevel", "INFO");
    parms.put("consumer.bridgeErrorHandler", "true");
    parms.put("idempotentRepository", "#idemRepo");
    parms.put("noop", "true");
    parms.put("readLock", "changed");
    parms.put("readLockLoggingLevel", "INFO");
    parms.put("readLockMarkerFile", "false");
    parms.put("initialDelay", "0");
    parms.put("autoCreate", "false");
    parms.put("maximumReconnectAttempts", "0");
    parms.put("streamDownload", "true");
    parms.put("stepwise", "false");
    parms.put("throwExceptionOnConnectFailed", "true");
    parms.put("useList", "false");
    downloadConsumer.start();
    Exchange downloadExchange = 
downloadConsumer.receive(URISupport.normalizeUri(URISupport.appendParametersToURI("ftp://user@localhost/path";,
 parms));
    unpackProducer.process(downloadExchange);
    if (downloadExchange.isFailed()) {
      LOGGER.error("unpack failed", downloadExchange.getException());
      exchange.setException(downloadExchange.getException());
    }
    downloadConsumer.doneUoW(downloadExchange);
    downloadConsumer.stop();
}


The unpack route is defined as:

from("direct:unpack").routeId("unpack")
.convertBodyTo(InputStream.class, null)
  .split(new TarSplitter()).streaming()
    .choice()
      .when(header(FILE_NAME).regex(XML_FILTER))
        .unmarshal().jacksonxml(POJO.class)
        .endChoice()
      .when(header(FILE_NAME).regex(XML2_FILTER))
        .unmarshal().jacksonxml(POJO2.class)
        .endChoice()
      .end()
    .end()
  .to("file://...")


First, is this a good solution to support concurrent ftp consumers? I see that 
new FTPClient instances are created and processed on the same thread. Is there 
a better solution?

Second, with the seda queue some files encounter an exception in 
GZIPInputStream. If direct is used instead of seda so that only a single file 
is processed then no errors occur. This seems to point to a concurrency issue. 
Am I missing something obvious?

Thanks in advance for any help.

Paul

Reply via email to