Bob Browning created CAMEL-8088: ----------------------------------- Summary: FTP can wait indefinitely when connection timeout occurs during connect Key: CAMEL-8088 URL: https://issues.apache.org/jira/browse/CAMEL-8088 Project: Camel Issue Type: Bug Components: camel-ftp Affects Versions: 2.13.3 Reporter: Bob Browning Priority: Minor
In our production system we have seen cases where the FTP thread is waiting for a response indefinitely despite having set _soTimeout_ on the connection. On investigation this is due to a condition that can occur where a socket is able to connect yet a firewall or the ilk then blocks further traffic. This can be over come by setting the property _ftpClient.defaultTimeout_ to a non-zero value. It should be the case where if upon initial socket connection no response occurs that the socket should be deemed dead, however this is not the case. When the following exception is thrown during initial connect to an FTP server, after the socket has connected but whilst awaiting the inital reply it can leave the RemoteFileProducer in a state where it is connected but not logged in and no attempt reconnect is attempted, if the soTimeout as set by _ftpClient.defaultTimeout_ is set to zero then it can cause a subsequent command will wait for a reply indefinitely. {pre} Caused by: java.io.IOException: Timed out waiting for initial connect reply at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:389) ~[commons-net-3.1.jar:3.1] at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:796) ~[commons-net-3.1.jar:3.1] at org.apache.commons.net.SocketClient.connect(SocketClient.java:172) ~[commons-net-3.1.jar:3.1] at org.apache.commons.net.SocketClient.connect(SocketClient.java:192) ~[commons-net-3.1.jar:3.1] at org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:95) ~[camel-ftp-2.13.1.jar:2.13.1] {pre} The RemoteFileProducer will enter this block as the loggedIn state has not yet been reached, however the existing broken socket is reused. {code} // recover by re-creating operations which should most likely be able to recover if (!loggedIn) { log.debug("Trying to recover connection to: {} with a fresh client.", getEndpoint()); setOperations(getEndpoint().createRemoteFileOperations()); connectIfNecessary(); } {code} Yet the _connectIfNecessary()_ method will return immediately since the check condition is based on socket connection and takes no account of whether login was achieved so the 'dead' socket is reused. {code} protected void connectIfNecessary() throws GenericFileOperationFailedException { // This will be skipped when loggedIn = false and the socket is connected if (!getOperations().isConnected()) { log.debug("Not already connected/logged in. Connecting to: {}", getEndpoint()); RemoteFileConfiguration config = getEndpoint().getConfiguration(); loggedIn = getOperations().connect(config); if (!loggedIn) { return; } log.info("Connected and logged in to: " + getEndpoint()); } } {code} A dirty test that blocks of this blocking condition: {code} package ftp; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.commons.net.ftp.FTPClient; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockftpserver.fake.FakeFtpServer; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class FtpInitialConnectTimeoutTest extends CamelTestSupport { private static final int CONNECT_TIMEOUT = 11223; /** * Create the answer for the socket factory that causes a SocketTimeoutException to occur in connect. */ private static class SocketAnswer implements Answer<Socket> { @Override public Socket answer(InvocationOnMock invocation) throws Throwable { final Socket socket = Mockito.spy(new Socket()); final AtomicBoolean timeout = new AtomicBoolean(); try { doAnswer(new Answer<InputStream>() { @Override public InputStream answer(InvocationOnMock invocation) throws Throwable { final InputStream stream = (InputStream) invocation.callRealMethod(); InputStream inputStream = new InputStream() { @Override public int read() throws IOException { if (timeout.get()) { // emulate a timeout occuring in _getReply() throw new SocketTimeoutException(); } return stream.read(); } }; return inputStream; } }).when(socket).getInputStream(); } catch (IOException ignored) { } try { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { if ((Integer) invocation.getArguments()[0] == CONNECT_TIMEOUT) { // setting of connect timeout timeout.set(true); } else { // non-connect timeout timeout.set(false); } return invocation.callRealMethod(); } }).when(socket).setSoTimeout(anyInt()); } catch (SocketException e) { throw new RuntimeException(e); } return socket; } } private FakeFtpServer fakeFtpServer; @Override @Before public void setUp() throws Exception { fakeFtpServer = new FakeFtpServer(); fakeFtpServer.setServerControlPort(0); fakeFtpServer.start(); super.setUp(); } @Override @After public void tearDown() throws Exception { super.tearDown(); if (fakeFtpServer != null) { fakeFtpServer.stop(); } } @Test public void testName() throws Exception { sendBody("direct:start", "test"); } private FTPClient mockedClient() throws IOException { FTPClient client = new FTPClient(); client.setSocketFactory(createSocketFactory()); return client; } private SocketFactory createSocketFactory() throws IOException { SocketFactory socketFactory = mock(SocketFactory.class); when(socketFactory.createSocket()).thenAnswer(new SocketAnswer()); return socketFactory; } @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); registry.bind("mocked", mockedClient()); return registry; } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .to("ftp://localhost:" + fakeFtpServer.getServerControlPort() + "?ftpClient=#mocked" + "&soTimeout=1234&" + "connectTimeout=" + CONNECT_TIMEOUT); } }; } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)