Hello, all! I have a working SFTP binding component. Please let me know what I need to change to commit it.
Here is the sample poller configuration. Sender is available as well: <sftp:poller service="sample:processSftpFile" targetService="sample:logPollableSshFile" endpoint="soap" uri="sftp://user:[EMAIL PROTECTED]/sftp_dir"> <sftp:clientPool> <ref bean="sshConnectionPool"/> </sftp:clientPool> </sftp:poller> /** * A polling endpoint which looks for a file or files in a directory * and sends the files into the JBI bus as messages, deleting the files * by default when they are processed. * * @org.apache.xbean.XBean element="poller" * * @version $Revision: 468487 $ */ public class SftpPollerEndpoint extends PollingEndpoint implements SftpEndpointType { private KeyedObjectPool sshClientPool; private JschPoolableKey sshServerInfo; private FileFilter filter; private boolean deleteFile = true; private boolean recursive = true; private FileMarshaler marshaler = new DefaultFileMarshaler(); private LockManager lockManager; private URI uri; public SftpPollerEndpoint() { } /** * @param serviceUnit * @param service * @param endpoint */ public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) { super(serviceUnit, service, endpoint); } /** * @param component * @param endpoint */ public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) { super(component, endpoint); } /* (non-Javadoc) * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll() */ public void poll() throws Exception { pollFileOrDirectory(getWorkingPath()); } /* (non-Javadoc) * @see org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate() */ public void validate() throws DeploymentException { super.validate(); if (uri == null && (getSshServerInfo() == null || getSshServerInfo().getHost() == null)) { throw new DeploymentException("Property uri or SSH Server Information must be configured"); } if (uri != null && getSshServerInfo() != null && getSshServerInfo().getHost() != null) { throw new DeploymentException("Properties uri and SSH Server Information can not be configured at the same time"); } } /* (non-Javadoc) * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start() */ public void start() throws Exception { if (lockManager == null) { lockManager = createLockManager(); } if (uri != null) { String host = uri.getHost(); int port = uri.getPort(); String password = ""; String username = ""; if (uri.getUserInfo() != null) { String[] infos = uri.getUserInfo().split(":"); username = infos[0]; if (infos.length > 1) { password = infos[1]; } } sshServerInfo = new JschPoolableKey(host, port, username, password); } else { String str = "sftp://" + sshServerInfo.getHost(); if (sshServerInfo.getPort() >= 0) { str += ":" + sshServerInfo.getPort(); } str += "/"; uri = new URI(str); } super.start(); } /** * @return */ protected LockManager createLockManager() { return new SimpleLockManager(); } /** * @return */ private String getWorkingPath() { return (uri != null && uri.getPath() != null) ? uri.getPath() : "."; } // Properties //------------------------------------------------------------------------- /** * @return the clientPool */ public KeyedObjectPool getClientPool() { return sshClientPool; } /** * @param clientPool the clientPool to set */ public void setClientPool(KeyedObjectPool clientPool) { this.sshClientPool = clientPool; } /** * @return the sshServerInfo */ public JschPoolableKey getSshServerInfo() { return sshServerInfo; } /** * @param sshServerInfo the sshServerInfo to set */ public void setSshServerInfo(JschPoolableKey sshServerInfo) { this.sshServerInfo = sshServerInfo; } /** * @return the uri */ public URI getUri() { return uri; } /** * @param uri the uri to set */ public void setUri(URI uri) { this.uri = uri; } public FileFilter getFilter() { return filter; } /** * Sets the optional filter to choose which files to process */ public void setFilter(FileFilter filter) { this.filter = filter; } /** * Returns whether or not we should delete the file when its processed */ public boolean isDeleteFile() { return deleteFile; } /** * @param deleteFile */ public void setDeleteFile(boolean deleteFile) { this.deleteFile = deleteFile; } /** * @return */ public boolean isRecursive() { return recursive; } /** * @param recursive */ public void setRecursive(boolean recursive) { this.recursive = recursive; } /** * @return */ public FileMarshaler getMarshaler() { return marshaler; } /** * @param marshaler */ public void setMarshaler(FileMarshaler marshaler) { this.marshaler = marshaler; } // Implementation methods //------------------------------------------------------------------------- /** * @param fileOrDirectory * @throws Exception */ protected void pollFileOrDirectory(String fileOrDirectory) throws Exception { JschSessionWrapper sftp = borrowClient(); try { logger.debug("Polling directory " + fileOrDirectory); Channel channel = sftp.getSession().openChannel("sftp"); channel.connect(); ChannelSftp sftpChannel = (ChannelSftp)channel; pollFileOrDirectory(sftpChannel, fileOrDirectory, true); } finally { returnClient(sftp); } } /** * @param sftpChannel * @param fileOrDirectory * @param processDir * @throws Exception */ protected void pollFileOrDirectory(ChannelSftp sftpChannel, String fileOrDirectory, boolean processDir) throws Exception { Vector files = sftpChannel.ls(fileOrDirectory); if(files != null && !files.isEmpty()){ for(int i=0; i < files.size(); i++){ LsEntry entry = (LsEntry)files.elementAt(i); SftpATTRS attrs = entry.getAttrs(); String name = entry.getLongname(); // Ignore "." and ".." if (name.equals(".") || name.equals("..")) { continue; } String file = fileOrDirectory + "/" + name; if (!attrs.isDir()) { // This is a file, process it. if (getFilter() == null || getFilter().accept(new File(file))) { pollFile(file); // Process the file. } } else if (processDir) { // Only process directories if processDir is true logger.debug("Polling directory " + file); pollFileOrDirectory(sftpChannel, file, isRecursive()); } else { logger.debug("Skipping directory " + file); } } } } /** * @param file */ protected void pollFile(final String file) { logger.debug("Scheduling file " + file + " for processing"); getExecutor().execute(new Runnable() { public void run() { final Lock lock = lockManager.getLock(file); if (lock.tryLock()) { boolean unlock = true; try { unlock = processFileAndDelete(file); } finally { if (unlock) { lock.unlock(); } } } } }); } /** * @param file * @return */ protected boolean processFileAndDelete(String file) { JschSessionWrapper sftp = null; boolean unlock = true; try { sftp = borrowClient(); Channel channel = sftp.getSession().openChannel("sftp"); channel.connect(); ChannelSftp sftpChannel = (ChannelSftp)channel; // Process the file. If processing fails, an exception should be thrown. logger.debug("Processing file " + file); processFile(sftpChannel, file); // Processing is succesfull, we should not unlock until the file has been deleted. unlock = false; if (isDeleteFile()) { sftpChannel.rm(file); unlock = true; } } catch (Exception e) { logger.error("Failed to process file: " + file + ". Reason: " + e, e); } finally { returnClient(sftp); } return unlock; } /** * @param sftpChannel * @param file * @throws Exception */ protected void processFile(ChannelSftp sftpChannel, String file) throws Exception { InputStream in = sftpChannel.get(file); InOnly exchange = getExchangeFactory().createInOnlyExchange(); configureExchangeTarget(exchange); NormalizedMessage message = exchange.createMessage(); exchange.setInMessage(message); marshaler.readMessage(exchange, message, in, file); sendSync(exchange); in.close(); if (exchange.getStatus() == ExchangeStatus.ERROR) { Exception e = exchange.getError(); if (e == null) { e = new JBIException("Unkown error"); } throw e; } } /* (non-Javadoc) * @see org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI() */ public String getLocationURI() { return uri.toString(); } /* (non-Javadoc) * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange) */ public void process(MessageExchange exchange) throws Exception { // Do nothing. In our case, this method should never be called // as we only send synchronous InOnly exchange } /** * Retrieve ssh session from a pool. * * @return * @throws JBIException */ protected JschSessionWrapper borrowClient() throws JBIException { JschSessionWrapper session = null; try { session = (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo); } catch (Exception e) { throw new JBIException("Failed to retrieve Jsch session from pool.", e); } if (session != null && session.getSession().isConnected()) { logger.info("Connected to jsch session at " + session.getSession().getUserName() + "@" + session.getSession().getHost() + ":" + session.getSession().getPort()); } return session; } /** * @param client */ protected void returnClient(JschSessionWrapper client) { if (client != null) { try { getClientPool().returnObject(client, sshServerInfo); } catch (Exception e) { logger.error("Failed to return client to pool: " + e, e); } } } } -- View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136 Sent from the ServiceMix - User mailing list archive at Nabble.com.
