[ https://issues.apache.org/jira/browse/CAMEL-12244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen resolved CAMEL-12244. --------------------------------- Resolution: Fixed Thanks for reporting and diving into the source code and finding the problem. > RemoteFileProducer stopped instead of being released to the pool when > "interceptSendToEndpoint" is used > ------------------------------------------------------------------------------------------------------- > > Key: CAMEL-12244 > URL: https://issues.apache.org/jira/browse/CAMEL-12244 > Project: Camel > Issue Type: Bug > Affects Versions: 2.19.0 > Reporter: Krzysztof SzafraĆski > Assignee: Claus Ibsen > Priority: Major > Fix For: 2.21.0 > > > In our application we're using an SFTP producer with "fileExist=Move" and a > specific "moveExisting" expression. I encountered a problem where this would > sometimes work, and sometimes not (i.e. there would be no ".archived" file). > Upon further investigation I found the problem and it seems to be a bug in > Camel. > Our SFTP endpoint looks like this: > {code:none} > sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS} > {code} > We also have an interceptor: > {code:none} > route.interceptSendToEndpoint("sftp://.*").process(exchange -> > LOG.info("Sending file {} to {}", ...)); > {code} > As I discovered, using the interceptor wraps the RemoteFileProducer with > InterceptSendToEndpoint. This however changes the behavior of the > ProducerCache: > {code} > public boolean doInAsyncProducer(...) { > ... > return producerCallback.doInAsyncProducer(producer, asyncProcessor, > exchange, pattern, doneSync -> { > ... > if (producer instanceof ServicePoolAware) { > // release back to the pool > pool.release(endpoint, producer); > } else if (!producer.isSingleton()) { > // stop and shutdown non-singleton producers as we should not > leak resources > try { > ServiceHelper.stopAndShutdownService(producer); > } catch (Exception e) { > ... > } > } > ... > }); > ... > } > {code} > RemoteFileProducer implements ServicePoolAware so it would normally go back > to the pool, but InterceptSendToEndpoint _does not_. As a result, our > producers keep getting stopped (note that RemoteFileProducer#isSingleton > always returns false). > What's more, somehow they _are_ being reused and in the end we run into > situations, where one thread is closing a producer, while another thread is > trying to write with it. > I set up some breakpoints that log the thread name and > System#identityHashCode of the producer: > {code} > 2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Starting producer: RemoteFileProducer[...] > 2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Processing file: [my_file] for exchange: ... > 2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Not already connected/logged in. Connecting to: ... > doStop(), time: 1518098725112, thread [Camel (camel-1) thread #35 - > CamelInvocationHandler], producer: 889747012 > at > org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175) > at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102) > at > org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142) > at > org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196) > at > org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164) > at > org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211) > at > org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450) > at > org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178) > at > org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171) > at > org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) > at > org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) > at org.apache.camel.processor.Splitter.process(Splitter.java:114) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > at > org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) > at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) > at > org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Connected and logged in to: ... > 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Disconnecting from: ... > 2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer : > About to write [my_file] to [...] from exchange [...] > 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Stopping producer: RemoteFileProducer[...] > 2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Starting > 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Starting producer: RemoteFileProducer[...] > 2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Processing file: [another_file] for exchange: Exchange[...] > 2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Not already connected/logged in. Connecting to: ... > handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 > - CamelInvocationHandler], producer: 889747012 > at > org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81) > at > org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227) > at > org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58) > at > org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167) > at > org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) > at > org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) > at org.apache.camel.processor.Splitter.process(Splitter.java:114) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > at > org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) > at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) > at > org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.camel.component.file.GenericFileOperationFailedException: Cannot > change directory to: [my_directory] > at > org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596) > at > org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584) > at > org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830) > at > org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277) > at > org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165) > ... 39 more > Caused by: 4: > at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359) > at > org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594) > ... 43 more > Caused by: java.io.IOException: Pipe closed > at java.io.PipedInputStream.read(PipedInputStream.java:307) > at > com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362) > at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337) > ... 44 more > 2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Exception occurred during stopping: Cannot change directory to: [my_directory] > {code} > So thread #35 stopped the producer, while thread #37 was trying to use it. > One more ugly thing about it is that when SftpOperations fail due to a closed > pipe, by the time we get to RemoteFileProducer#handleFailedWrite: > {code} > public void handleFailedWrite(...) throws Exception { > ... > if (isStopping() || isStopped()) { > // if we are stopping then ignore any exception during a poll > log.debug("Exception occurred during stopping: " + > exception.getMessage()); > } else { > log.warn("Writing file failed with: " + exception.getMessage()); > ... > throw exception; > } > } > {code} > the producer is already stopped, *so the exception is logged on DEBUG and not > rethrown*. > Note that I'm writing multiple files in parallel (three in my case), I'm > using this to send data to the route ending in the SFTP endpoint: > {code} > @Produce(uri = "direct:myDir") > private MyDir myDir; > ... > myDir.sendAsync(...) > {code} > where > {code} > public interface MyDir { > Future<?> sendAsync(...); > } > {code} > We're using Camel 2.19.0, but so far that I've looked at the github > repository, the issue is most likely present in the current version too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)