Hello, all!
I have a working SFTP binding component. Please let me know what I need to
change to commit it.

Here is the sample poller configuration. Sender is available as well:
<sftp:poller service="sample:processSftpFile"
targetService="sample:logPollableSshFile" endpoint="soap"
uri="sftp://user:[EMAIL PROTECTED]/sftp_dir">
                <sftp:clientPool>
                        <ref bean="sshConnectionPool"/>
                </sftp:clientPool>
        </sftp:poller>

/**
 * A polling endpoint which looks for a file or files in a directory
 * and sends the files into the JBI bus as messages, deleting the files
 * by default when they are processed.
 *
 * @org.apache.xbean.XBean element="poller"
 *
 * @version $Revision: 468487 $
 */
public class SftpPollerEndpoint extends PollingEndpoint implements
SftpEndpointType {

    private KeyedObjectPool sshClientPool;
    private JschPoolableKey sshServerInfo;
    private FileFilter filter;
    private boolean deleteFile = true;
    private boolean recursive = true;
    private FileMarshaler marshaler = new DefaultFileMarshaler();
    private LockManager lockManager;
    private URI uri;

    public SftpPollerEndpoint() {
    }

    /**
     * @param serviceUnit
     * @param service
     * @param endpoint
     */
    public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service, String
endpoint) {
        super(serviceUnit, service, endpoint);
    }

    /**
     * @param component
     * @param endpoint
     */
    public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint
endpoint) {
        super(component, endpoint);
    }

    /* (non-Javadoc)
     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
     */
    public void poll() throws Exception {
        pollFileOrDirectory(getWorkingPath());
    }

    /* (non-Javadoc)
     * @see
org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
     */
    public void validate() throws DeploymentException {
        super.validate();
        if (uri == null && (getSshServerInfo() == null ||
getSshServerInfo().getHost() == null)) {
            throw new DeploymentException("Property uri or SSH Server
Information must be configured");
        }
        if (uri != null && getSshServerInfo() != null &&
getSshServerInfo().getHost() != null) {
            throw new DeploymentException("Properties uri and SSH Server
Information can not be configured at the same time");
        }
    }
    
    /* (non-Javadoc)
     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
     */
    public void start() throws Exception {
        if (lockManager == null) {
            lockManager = createLockManager();
        }
        
        if (uri != null) {
                String host = uri.getHost();
            int port = uri.getPort();
            String password = "";
            String username = "";
            if (uri.getUserInfo() != null) {
                String[] infos = uri.getUserInfo().split(":");
                username = infos[0];
                if (infos.length > 1) {
                    password = infos[1];
                }
            }
            sshServerInfo = new JschPoolableKey(host, port, username,
password);
        } 
        else {
            String str = "sftp://"; + sshServerInfo.getHost();
            if (sshServerInfo.getPort() >= 0) {
                str += ":" + sshServerInfo.getPort();
            }
            str += "/";
            uri = new URI(str);
        }
        super.start();
    }
    
    /**
     * @return
     */
    protected LockManager createLockManager() {
        return new SimpleLockManager();
    }

    /**
     * @return
     */
    private String getWorkingPath() {
        return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
    }

    // Properties
   
//-------------------------------------------------------------------------
    /**
     * @return the clientPool
     */
    public KeyedObjectPool getClientPool() {
        return sshClientPool;
    }

    /**
     * @param clientPool the clientPool to set
     */
    public void setClientPool(KeyedObjectPool clientPool) {
        this.sshClientPool = clientPool;
    }

    /**
         * @return the sshServerInfo
         */
        public JschPoolableKey getSshServerInfo() {
                return sshServerInfo;
        }

        /**
         * @param sshServerInfo the sshServerInfo to set
         */
        public void setSshServerInfo(JschPoolableKey sshServerInfo) {
                this.sshServerInfo = sshServerInfo;
        }

        /**
     * @return the uri
     */
    public URI getUri() {
        return uri;
    }

    /**
     * @param uri the uri to set
     */
    public void setUri(URI uri) {
        this.uri = uri;
    }

    public FileFilter getFilter() {
        return filter;
    }

    /**
     * Sets the optional filter to choose which files to process
     */
    public void setFilter(FileFilter filter) {
        this.filter = filter;
    }

    /**
     * Returns whether or not we should delete the file when its processed
     */
    public boolean isDeleteFile() {
        return deleteFile;
    }

    /**
     * @param deleteFile
     */
    public void setDeleteFile(boolean deleteFile) {
        this.deleteFile = deleteFile;
    }

    /**
     * @return
     */
    public boolean isRecursive() {
        return recursive;
    }

    /**
     * @param recursive
     */
    public void setRecursive(boolean recursive) {
        this.recursive = recursive;
    }

    /**
     * @return
     */
    public FileMarshaler getMarshaler() {
        return marshaler;
    }

    /**
     * @param marshaler
     */
    public void setMarshaler(FileMarshaler marshaler) {
        this.marshaler = marshaler;
    }

    // Implementation methods
   
//-------------------------------------------------------------------------


    /**
     * @param fileOrDirectory
     * @throws Exception
     */
    protected void pollFileOrDirectory(String fileOrDirectory) throws
Exception {
        JschSessionWrapper sftp = borrowClient();
        try {
            logger.debug("Polling directory " + fileOrDirectory);
            Channel channel = sftp.getSession().openChannel("sftp");
            channel.connect();
            ChannelSftp sftpChannel = (ChannelSftp)channel;
            pollFileOrDirectory(sftpChannel, fileOrDirectory, true);
        }
        finally {
            returnClient(sftp);
        }
    }

    /**
     * @param sftpChannel
     * @param fileOrDirectory
     * @param processDir
     * @throws Exception
     */
    protected void pollFileOrDirectory(ChannelSftp sftpChannel, String
fileOrDirectory, boolean processDir) throws Exception {
        Vector files = sftpChannel.ls(fileOrDirectory);
        if(files != null && !files.isEmpty()){
                for(int i=0; i < files.size(); i++){
                        LsEntry entry = (LsEntry)files.elementAt(i);
                        SftpATTRS attrs = entry.getAttrs();
                        String name = entry.getLongname();
                        
                        // Ignore "." and ".."
                        if (name.equals(".") || name.equals("..")) {
                        continue;
                    }
                    
                        String file = fileOrDirectory + "/" + name;
                    if (!attrs.isDir()) {       // This is a file, process it.
                        if (getFilter() == null || getFilter().accept(new
File(file))) {
                            pollFile(file); // Process the file.
                        }
                    } 
                    else if (processDir) {      // Only process directories if
processDir is true
                        logger.debug("Polling directory " + file);
                        pollFileOrDirectory(sftpChannel, file, isRecursive());
                    } 
                    else {
                        logger.debug("Skipping directory " + file);
                    }
                }
            }
    }

    /**
     * @param file
     */
    protected void pollFile(final String file) {
        logger.debug("Scheduling file " + file + " for processing");
        getExecutor().execute(new Runnable() {
            public void run() {
                final Lock lock = lockManager.getLock(file);
                if (lock.tryLock()) {
                    boolean unlock = true;
                    try {
                        unlock = processFileAndDelete(file);
                    }
                    finally {
                        if (unlock) {
                            lock.unlock();
                        }
                    }
                }
            }
        });
    }

    /**
     * @param file
     * @return
     */
    protected boolean processFileAndDelete(String file) {
        JschSessionWrapper sftp = null;
        boolean unlock = true;
        try {
                sftp = borrowClient();
                Channel channel = sftp.getSession().openChannel("sftp");
            channel.connect();
            ChannelSftp sftpChannel = (ChannelSftp)channel;
            
            // Process the file. If processing fails, an exception should be
thrown.
            logger.debug("Processing file " + file);
            processFile(sftpChannel, file);
            
            // Processing is succesfull, we should not unlock until the file
has been deleted.
            unlock = false;
            if (isDeleteFile()) {
                sftpChannel.rm(file);
                unlock = true;
            }
        }
        catch (Exception e) {
            logger.error("Failed to process file: " + file + ". Reason: " +
e, e);
        } 
        finally {
            returnClient(sftp);
        }
        return unlock;
    }

    /**
     * @param sftpChannel
     * @param file
     * @throws Exception
     */
    protected void processFile(ChannelSftp sftpChannel, String file) throws
Exception {
        InputStream in = sftpChannel.get(file);
        InOnly exchange = getExchangeFactory().createInOnlyExchange();
        configureExchangeTarget(exchange);
        NormalizedMessage message = exchange.createMessage();
        exchange.setInMessage(message);
        marshaler.readMessage(exchange, message, in, file);
        sendSync(exchange);
        in.close();
        if (exchange.getStatus() == ExchangeStatus.ERROR) {
            Exception e = exchange.getError();
            if (e == null) {
                e = new JBIException("Unkown error");
            }
            throw e;
        }
    }

    /* (non-Javadoc)
     * @see
org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
     */
    public String getLocationURI() {
        return uri.toString();
    }

    /* (non-Javadoc)
     * @see
org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
     */
    public void process(MessageExchange exchange) throws Exception {
        // Do nothing. In our case, this method should never be called
        // as we only send synchronous InOnly exchange
    }
    
    /**
     * Retrieve ssh session from a pool.
     * 
     * @return
     * @throws JBIException
     */
    protected JschSessionWrapper borrowClient() throws JBIException {
        JschSessionWrapper session = null;
        try {
            session =
(JschSessionWrapper)getClientPool().borrowObject(sshServerInfo);
        }
        catch (Exception e) {
            throw new JBIException("Failed to retrieve Jsch session from
pool.", e);
        }
        
        if (session != null && session.getSession().isConnected()) {
            logger.info("Connected to jsch session at " +
session.getSession().getUserName() + "@" + session.getSession().getHost() +
":" + session.getSession().getPort());
        }
        return session;
    }

    /**
     * @param client
     */
    protected void returnClient(JschSessionWrapper client) {
        if (client != null) {
            try {
                getClientPool().returnObject(client, sshServerInfo);
            }
            catch (Exception e) {
                logger.error("Failed to return client to pool: " + e, e);
            }
        }
    }
}
-- 
View this message in context: 
http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136
Sent from the ServiceMix - User mailing list archive at Nabble.com.

Reply via email to