Thanks, will do.
gnodet wrote: > > This is message for servicemix-dev btw. > The code looks good. Would you mind creating a JIRA and attach a patch or > zip with the complete project ? > AFAIK, the license for jcraft is BSD, so there should be no problem on > this > side. > > On 6/20/07, netflexity <[EMAIL PROTECTED]> wrote: >> >> >> Hello, all! >> I have a working SFTP binding component. Please let me know what I need >> to >> change to commit it. >> >> Here is the sample poller configuration. Sender is available as well: >> <sftp:poller service="sample:processSftpFile" >> targetService="sample:logPollableSshFile" endpoint="soap" >> uri="sftp://user:[EMAIL PROTECTED]/sftp_dir"> >> <sftp:clientPool> >> <ref bean="sshConnectionPool"/> >> </sftp:clientPool> >> </sftp:poller> >> >> /** >> * A polling endpoint which looks for a file or files in a directory >> * and sends the files into the JBI bus as messages, deleting the files >> * by default when they are processed. >> * >> * @org.apache.xbean.XBean element="poller" >> * >> * @version $Revision: 468487 $ >> */ >> public class SftpPollerEndpoint extends PollingEndpoint implements >> SftpEndpointType { >> >> private KeyedObjectPool sshClientPool; >> private JschPoolableKey sshServerInfo; >> private FileFilter filter; >> private boolean deleteFile = true; >> private boolean recursive = true; >> private FileMarshaler marshaler = new DefaultFileMarshaler(); >> private LockManager lockManager; >> private URI uri; >> >> public SftpPollerEndpoint() { >> } >> >> /** >> * @param serviceUnit >> * @param service >> * @param endpoint >> */ >> public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service, >> String >> endpoint) { >> super(serviceUnit, service, endpoint); >> } >> >> /** >> * @param component >> * @param endpoint >> */ >> public SftpPollerEndpoint(DefaultComponent component, ServiceEndpoint >> endpoint) { >> super(component, endpoint); >> } >> >> /* (non-Javadoc) >> * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll() >> */ >> public void poll() throws Exception { >> pollFileOrDirectory(getWorkingPath()); >> } >> >> /* (non-Javadoc) >> * @see >> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate() >> */ >> public void validate() throws DeploymentException { >> super.validate(); >> if (uri == null && (getSshServerInfo() == null || >> getSshServerInfo().getHost() == null)) { >> throw new DeploymentException("Property uri or SSH Server >> Information must be configured"); >> } >> if (uri != null && getSshServerInfo() != null && >> getSshServerInfo().getHost() != null) { >> throw new DeploymentException("Properties uri and SSH Server >> Information can not be configured at the same time"); >> } >> } >> >> /* (non-Javadoc) >> * @see >> org.apache.servicemix.common.endpoints.PollingEndpoint#start() >> */ >> public void start() throws Exception { >> if (lockManager == null) { >> lockManager = createLockManager(); >> } >> >> if (uri != null) { >> String host = uri.getHost(); >> int port = uri.getPort(); >> String password = ""; >> String username = ""; >> if (uri.getUserInfo() != null) { >> String[] infos = uri.getUserInfo().split(":"); >> username = infos[0]; >> if (infos.length > 1) { >> password = infos[1]; >> } >> } >> sshServerInfo = new JschPoolableKey(host, port, username, >> password); >> } >> else { >> String str = "sftp://" + sshServerInfo.getHost(); >> if (sshServerInfo.getPort() >= 0) { >> str += ":" + sshServerInfo.getPort(); >> } >> str += "/"; >> uri = new URI(str); >> } >> super.start(); >> } >> >> /** >> * @return >> */ >> protected LockManager createLockManager() { >> return new SimpleLockManager(); >> } >> >> /** >> * @return >> */ >> private String getWorkingPath() { >> return (uri != null && uri.getPath() != null) ? uri.getPath() : >> "."; >> } >> >> // Properties >> >> >> //------------------------------------------------------------------------- >> /** >> * @return the clientPool >> */ >> public KeyedObjectPool getClientPool() { >> return sshClientPool; >> } >> >> /** >> * @param clientPool the clientPool to set >> */ >> public void setClientPool(KeyedObjectPool clientPool) { >> this.sshClientPool = clientPool; >> } >> >> /** >> * @return the sshServerInfo >> */ >> public JschPoolableKey getSshServerInfo() { >> return sshServerInfo; >> } >> >> /** >> * @param sshServerInfo the sshServerInfo to set >> */ >> public void setSshServerInfo(JschPoolableKey sshServerInfo) { >> this.sshServerInfo = sshServerInfo; >> } >> >> /** >> * @return the uri >> */ >> public URI getUri() { >> return uri; >> } >> >> /** >> * @param uri the uri to set >> */ >> public void setUri(URI uri) { >> this.uri = uri; >> } >> >> public FileFilter getFilter() { >> return filter; >> } >> >> /** >> * Sets the optional filter to choose which files to process >> */ >> public void setFilter(FileFilter filter) { >> this.filter = filter; >> } >> >> /** >> * Returns whether or not we should delete the file when its >> processed >> */ >> public boolean isDeleteFile() { >> return deleteFile; >> } >> >> /** >> * @param deleteFile >> */ >> public void setDeleteFile(boolean deleteFile) { >> this.deleteFile = deleteFile; >> } >> >> /** >> * @return >> */ >> public boolean isRecursive() { >> return recursive; >> } >> >> /** >> * @param recursive >> */ >> public void setRecursive(boolean recursive) { >> this.recursive = recursive; >> } >> >> /** >> * @return >> */ >> public FileMarshaler getMarshaler() { >> return marshaler; >> } >> >> /** >> * @param marshaler >> */ >> public void setMarshaler(FileMarshaler marshaler) { >> this.marshaler = marshaler; >> } >> >> // Implementation methods >> >> >> //------------------------------------------------------------------------- >> >> >> /** >> * @param fileOrDirectory >> * @throws Exception >> */ >> protected void pollFileOrDirectory(String fileOrDirectory) throws >> Exception { >> JschSessionWrapper sftp = borrowClient(); >> try { >> logger.debug("Polling directory " + fileOrDirectory); >> Channel channel = sftp.getSession().openChannel("sftp"); >> channel.connect(); >> ChannelSftp sftpChannel = (ChannelSftp)channel; >> pollFileOrDirectory(sftpChannel, fileOrDirectory, true); >> } >> finally { >> returnClient(sftp); >> } >> } >> >> /** >> * @param sftpChannel >> * @param fileOrDirectory >> * @param processDir >> * @throws Exception >> */ >> protected void pollFileOrDirectory(ChannelSftp sftpChannel, String >> fileOrDirectory, boolean processDir) throws Exception { >> Vector files = sftpChannel.ls(fileOrDirectory); >> if(files != null && !files.isEmpty()){ >> for(int i=0; i < files.size(); i++){ >> LsEntry entry = (LsEntry)files.elementAt(i); >> SftpATTRS attrs = entry.getAttrs(); >> String name = entry.getLongname(); >> >> // Ignore "." and ".." >> if (name.equals(".") || name.equals("..")) { >> continue; >> } >> >> String file = fileOrDirectory + "/" + name; >> if (!attrs.isDir()) { // This is a file, >> process >> it. >> if (getFilter() == null || getFilter().accept(new >> File(file))) { >> pollFile(file); // Process the file. >> } >> } >> else if (processDir) { // Only process >> directories if >> processDir is true >> logger.debug("Polling directory " + file); >> pollFileOrDirectory(sftpChannel, file, >> isRecursive()); >> } >> else { >> logger.debug("Skipping directory " + file); >> } >> } >> } >> } >> >> /** >> * @param file >> */ >> protected void pollFile(final String file) { >> logger.debug("Scheduling file " + file + " for processing"); >> getExecutor().execute(new Runnable() { >> public void run() { >> final Lock lock = lockManager.getLock(file); >> if (lock.tryLock()) { >> boolean unlock = true; >> try { >> unlock = processFileAndDelete(file); >> } >> finally { >> if (unlock) { >> lock.unlock(); >> } >> } >> } >> } >> }); >> } >> >> /** >> * @param file >> * @return >> */ >> protected boolean processFileAndDelete(String file) { >> JschSessionWrapper sftp = null; >> boolean unlock = true; >> try { >> sftp = borrowClient(); >> Channel channel = sftp.getSession().openChannel("sftp"); >> channel.connect(); >> ChannelSftp sftpChannel = (ChannelSftp)channel; >> >> // Process the file. If processing fails, an exception should >> be >> thrown. >> logger.debug("Processing file " + file); >> processFile(sftpChannel, file); >> >> // Processing is succesfull, we should not unlock until the >> file >> has been deleted. >> unlock = false; >> if (isDeleteFile()) { >> sftpChannel.rm(file); >> unlock = true; >> } >> } >> catch (Exception e) { >> logger.error("Failed to process file: " + file + ". Reason: " >> + >> e, e); >> } >> finally { >> returnClient(sftp); >> } >> return unlock; >> } >> >> /** >> * @param sftpChannel >> * @param file >> * @throws Exception >> */ >> protected void processFile(ChannelSftp sftpChannel, String file) >> throws >> Exception { >> InputStream in = sftpChannel.get(file); >> InOnly exchange = getExchangeFactory().createInOnlyExchange(); >> configureExchangeTarget(exchange); >> NormalizedMessage message = exchange.createMessage(); >> exchange.setInMessage(message); >> marshaler.readMessage(exchange, message, in, file); >> sendSync(exchange); >> in.close(); >> if (exchange.getStatus() == ExchangeStatus.ERROR) { >> Exception e = exchange.getError(); >> if (e == null) { >> e = new JBIException("Unkown error"); >> } >> throw e; >> } >> } >> >> /* (non-Javadoc) >> * @see >> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI() >> */ >> public String getLocationURI() { >> return uri.toString(); >> } >> >> /* (non-Javadoc) >> * @see >> org.apache.servicemix.common.ExchangeProcessor#process( >> javax.jbi.messaging.MessageExchange) >> */ >> public void process(MessageExchange exchange) throws Exception { >> // Do nothing. In our case, this method should never be called >> // as we only send synchronous InOnly exchange >> } >> >> /** >> * Retrieve ssh session from a pool. >> * >> * @return >> * @throws JBIException >> */ >> protected JschSessionWrapper borrowClient() throws JBIException { >> JschSessionWrapper session = null; >> try { >> session = >> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo); >> } >> catch (Exception e) { >> throw new JBIException("Failed to retrieve Jsch session from >> pool.", e); >> } >> >> if (session != null && session.getSession().isConnected()) { >> logger.info("Connected to jsch session at " + >> session.getSession().getUserName() + "@" + session.getSession().getHost() >> + >> ":" + session.getSession().getPort()); >> } >> return session; >> } >> >> /** >> * @param client >> */ >> protected void returnClient(JschSessionWrapper client) { >> if (client != null) { >> try { >> getClientPool().returnObject(client, sshServerInfo); >> } >> catch (Exception e) { >> logger.error("Failed to return client to pool: " + e, e); >> } >> } >> } >> } >> -- >> View this message in context: >> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136 >> Sent from the ServiceMix - User mailing list archive at Nabble.com. >> >> > > > -- > Cheers, > Guillaume Nodet > ------------------------ > Principal Engineer, IONA > Blog: http://gnodet.blogspot.com/ > > -- View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11220100 Sent from the ServiceMix - User mailing list archive at Nabble.com.
