JIRA opened: https://issues.apache.org/activemq/browse/SM-974
Thank you. -Max netflexity wrote: > > Thanks, will do. > > > gnodet wrote: >> >> This is message for servicemix-dev btw. >> The code looks good. Would you mind creating a JIRA and attach a patch >> or >> zip with the complete project ? >> AFAIK, the license for jcraft is BSD, so there should be no problem on >> this >> side. >> >> On 6/20/07, netflexity <[EMAIL PROTECTED]> wrote: >>> >>> >>> Hello, all! >>> I have a working SFTP binding component. Please let me know what I need >>> to >>> change to commit it. >>> >>> Here is the sample poller configuration. Sender is available as well: >>> <sftp:poller service="sample:processSftpFile" >>> targetService="sample:logPollableSshFile" endpoint="soap" >>> uri="sftp://user:[EMAIL PROTECTED]/sftp_dir"> >>> <sftp:clientPool> >>> <ref bean="sshConnectionPool"/> >>> </sftp:clientPool> >>> </sftp:poller> >>> >>> /** >>> * A polling endpoint which looks for a file or files in a directory >>> * and sends the files into the JBI bus as messages, deleting the files >>> * by default when they are processed. >>> * >>> * @org.apache.xbean.XBean element="poller" >>> * >>> * @version $Revision: 468487 $ >>> */ >>> public class SftpPollerEndpoint extends PollingEndpoint implements >>> SftpEndpointType { >>> >>> private KeyedObjectPool sshClientPool; >>> private JschPoolableKey sshServerInfo; >>> private FileFilter filter; >>> private boolean deleteFile = true; >>> private boolean recursive = true; >>> private FileMarshaler marshaler = new DefaultFileMarshaler(); >>> private LockManager lockManager; >>> private URI uri; >>> >>> public SftpPollerEndpoint() { >>> } >>> >>> /** >>> * @param serviceUnit >>> * @param service >>> * @param endpoint >>> */ >>> public SftpPollerEndpoint(ServiceUnit serviceUnit, QName service, >>> String >>> endpoint) { >>> super(serviceUnit, service, endpoint); >>> } >>> >>> /** >>> * @param component >>> * @param endpoint >>> */ >>> public SftpPollerEndpoint(DefaultComponent component, >>> ServiceEndpoint >>> endpoint) { >>> super(component, endpoint); >>> } >>> >>> /* (non-Javadoc) >>> * @see >>> org.apache.servicemix.common.endpoints.PollingEndpoint#poll() >>> */ >>> public void poll() throws Exception { >>> pollFileOrDirectory(getWorkingPath()); >>> } >>> >>> /* (non-Javadoc) >>> * @see >>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate() >>> */ >>> public void validate() throws DeploymentException { >>> super.validate(); >>> if (uri == null && (getSshServerInfo() == null || >>> getSshServerInfo().getHost() == null)) { >>> throw new DeploymentException("Property uri or SSH Server >>> Information must be configured"); >>> } >>> if (uri != null && getSshServerInfo() != null && >>> getSshServerInfo().getHost() != null) { >>> throw new DeploymentException("Properties uri and SSH Server >>> Information can not be configured at the same time"); >>> } >>> } >>> >>> /* (non-Javadoc) >>> * @see >>> org.apache.servicemix.common.endpoints.PollingEndpoint#start() >>> */ >>> public void start() throws Exception { >>> if (lockManager == null) { >>> lockManager = createLockManager(); >>> } >>> >>> if (uri != null) { >>> String host = uri.getHost(); >>> int port = uri.getPort(); >>> String password = ""; >>> String username = ""; >>> if (uri.getUserInfo() != null) { >>> String[] infos = uri.getUserInfo().split(":"); >>> username = infos[0]; >>> if (infos.length > 1) { >>> password = infos[1]; >>> } >>> } >>> sshServerInfo = new JschPoolableKey(host, port, username, >>> password); >>> } >>> else { >>> String str = "sftp://" + sshServerInfo.getHost(); >>> if (sshServerInfo.getPort() >= 0) { >>> str += ":" + sshServerInfo.getPort(); >>> } >>> str += "/"; >>> uri = new URI(str); >>> } >>> super.start(); >>> } >>> >>> /** >>> * @return >>> */ >>> protected LockManager createLockManager() { >>> return new SimpleLockManager(); >>> } >>> >>> /** >>> * @return >>> */ >>> private String getWorkingPath() { >>> return (uri != null && uri.getPath() != null) ? uri.getPath() : >>> "."; >>> } >>> >>> // Properties >>> >>> >>> //------------------------------------------------------------------------- >>> /** >>> * @return the clientPool >>> */ >>> public KeyedObjectPool getClientPool() { >>> return sshClientPool; >>> } >>> >>> /** >>> * @param clientPool the clientPool to set >>> */ >>> public void setClientPool(KeyedObjectPool clientPool) { >>> this.sshClientPool = clientPool; >>> } >>> >>> /** >>> * @return the sshServerInfo >>> */ >>> public JschPoolableKey getSshServerInfo() { >>> return sshServerInfo; >>> } >>> >>> /** >>> * @param sshServerInfo the sshServerInfo to set >>> */ >>> public void setSshServerInfo(JschPoolableKey sshServerInfo) { >>> this.sshServerInfo = sshServerInfo; >>> } >>> >>> /** >>> * @return the uri >>> */ >>> public URI getUri() { >>> return uri; >>> } >>> >>> /** >>> * @param uri the uri to set >>> */ >>> public void setUri(URI uri) { >>> this.uri = uri; >>> } >>> >>> public FileFilter getFilter() { >>> return filter; >>> } >>> >>> /** >>> * Sets the optional filter to choose which files to process >>> */ >>> public void setFilter(FileFilter filter) { >>> this.filter = filter; >>> } >>> >>> /** >>> * Returns whether or not we should delete the file when its >>> processed >>> */ >>> public boolean isDeleteFile() { >>> return deleteFile; >>> } >>> >>> /** >>> * @param deleteFile >>> */ >>> public void setDeleteFile(boolean deleteFile) { >>> this.deleteFile = deleteFile; >>> } >>> >>> /** >>> * @return >>> */ >>> public boolean isRecursive() { >>> return recursive; >>> } >>> >>> /** >>> * @param recursive >>> */ >>> public void setRecursive(boolean recursive) { >>> this.recursive = recursive; >>> } >>> >>> /** >>> * @return >>> */ >>> public FileMarshaler getMarshaler() { >>> return marshaler; >>> } >>> >>> /** >>> * @param marshaler >>> */ >>> public void setMarshaler(FileMarshaler marshaler) { >>> this.marshaler = marshaler; >>> } >>> >>> // Implementation methods >>> >>> >>> //------------------------------------------------------------------------- >>> >>> >>> /** >>> * @param fileOrDirectory >>> * @throws Exception >>> */ >>> protected void pollFileOrDirectory(String fileOrDirectory) throws >>> Exception { >>> JschSessionWrapper sftp = borrowClient(); >>> try { >>> logger.debug("Polling directory " + fileOrDirectory); >>> Channel channel = sftp.getSession().openChannel("sftp"); >>> channel.connect(); >>> ChannelSftp sftpChannel = (ChannelSftp)channel; >>> pollFileOrDirectory(sftpChannel, fileOrDirectory, true); >>> } >>> finally { >>> returnClient(sftp); >>> } >>> } >>> >>> /** >>> * @param sftpChannel >>> * @param fileOrDirectory >>> * @param processDir >>> * @throws Exception >>> */ >>> protected void pollFileOrDirectory(ChannelSftp sftpChannel, String >>> fileOrDirectory, boolean processDir) throws Exception { >>> Vector files = sftpChannel.ls(fileOrDirectory); >>> if(files != null && !files.isEmpty()){ >>> for(int i=0; i < files.size(); i++){ >>> LsEntry entry = (LsEntry)files.elementAt(i); >>> SftpATTRS attrs = entry.getAttrs(); >>> String name = entry.getLongname(); >>> >>> // Ignore "." and ".." >>> if (name.equals(".") || name.equals("..")) { >>> continue; >>> } >>> >>> String file = fileOrDirectory + "/" + name; >>> if (!attrs.isDir()) { // This is a file, >>> process >>> it. >>> if (getFilter() == null || >>> getFilter().accept(new >>> File(file))) { >>> pollFile(file); // Process the file. >>> } >>> } >>> else if (processDir) { // Only process >>> directories if >>> processDir is true >>> logger.debug("Polling directory " + file); >>> pollFileOrDirectory(sftpChannel, file, >>> isRecursive()); >>> } >>> else { >>> logger.debug("Skipping directory " + file); >>> } >>> } >>> } >>> } >>> >>> /** >>> * @param file >>> */ >>> protected void pollFile(final String file) { >>> logger.debug("Scheduling file " + file + " for processing"); >>> getExecutor().execute(new Runnable() { >>> public void run() { >>> final Lock lock = lockManager.getLock(file); >>> if (lock.tryLock()) { >>> boolean unlock = true; >>> try { >>> unlock = processFileAndDelete(file); >>> } >>> finally { >>> if (unlock) { >>> lock.unlock(); >>> } >>> } >>> } >>> } >>> }); >>> } >>> >>> /** >>> * @param file >>> * @return >>> */ >>> protected boolean processFileAndDelete(String file) { >>> JschSessionWrapper sftp = null; >>> boolean unlock = true; >>> try { >>> sftp = borrowClient(); >>> Channel channel = sftp.getSession().openChannel("sftp"); >>> channel.connect(); >>> ChannelSftp sftpChannel = (ChannelSftp)channel; >>> >>> // Process the file. If processing fails, an exception >>> should >>> be >>> thrown. >>> logger.debug("Processing file " + file); >>> processFile(sftpChannel, file); >>> >>> // Processing is succesfull, we should not unlock until the >>> file >>> has been deleted. >>> unlock = false; >>> if (isDeleteFile()) { >>> sftpChannel.rm(file); >>> unlock = true; >>> } >>> } >>> catch (Exception e) { >>> logger.error("Failed to process file: " + file + ". Reason: >>> " >>> + >>> e, e); >>> } >>> finally { >>> returnClient(sftp); >>> } >>> return unlock; >>> } >>> >>> /** >>> * @param sftpChannel >>> * @param file >>> * @throws Exception >>> */ >>> protected void processFile(ChannelSftp sftpChannel, String file) >>> throws >>> Exception { >>> InputStream in = sftpChannel.get(file); >>> InOnly exchange = getExchangeFactory().createInOnlyExchange(); >>> configureExchangeTarget(exchange); >>> NormalizedMessage message = exchange.createMessage(); >>> exchange.setInMessage(message); >>> marshaler.readMessage(exchange, message, in, file); >>> sendSync(exchange); >>> in.close(); >>> if (exchange.getStatus() == ExchangeStatus.ERROR) { >>> Exception e = exchange.getError(); >>> if (e == null) { >>> e = new JBIException("Unkown error"); >>> } >>> throw e; >>> } >>> } >>> >>> /* (non-Javadoc) >>> * @see >>> org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI() >>> */ >>> public String getLocationURI() { >>> return uri.toString(); >>> } >>> >>> /* (non-Javadoc) >>> * @see >>> org.apache.servicemix.common.ExchangeProcessor#process( >>> javax.jbi.messaging.MessageExchange) >>> */ >>> public void process(MessageExchange exchange) throws Exception { >>> // Do nothing. In our case, this method should never be called >>> // as we only send synchronous InOnly exchange >>> } >>> >>> /** >>> * Retrieve ssh session from a pool. >>> * >>> * @return >>> * @throws JBIException >>> */ >>> protected JschSessionWrapper borrowClient() throws JBIException { >>> JschSessionWrapper session = null; >>> try { >>> session = >>> (JschSessionWrapper)getClientPool().borrowObject(sshServerInfo); >>> } >>> catch (Exception e) { >>> throw new JBIException("Failed to retrieve Jsch session from >>> pool.", e); >>> } >>> >>> if (session != null && session.getSession().isConnected()) { >>> logger.info("Connected to jsch session at " + >>> session.getSession().getUserName() + "@" + >>> session.getSession().getHost() >>> + >>> ":" + session.getSession().getPort()); >>> } >>> return session; >>> } >>> >>> /** >>> * @param client >>> */ >>> protected void returnClient(JschSessionWrapper client) { >>> if (client != null) { >>> try { >>> getClientPool().returnObject(client, sshServerInfo); >>> } >>> catch (Exception e) { >>> logger.error("Failed to return client to pool: " + e, >>> e); >>> } >>> } >>> } >>> } >>> -- >>> View this message in context: >>> http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11218136 >>> Sent from the ServiceMix - User mailing list archive at Nabble.com. >>> >>> >> >> >> -- >> Cheers, >> Guillaume Nodet >> ------------------------ >> Principal Engineer, IONA >> Blog: http://gnodet.blogspot.com/ >> >> > > -- View this message in context: http://www.nabble.com/SFTP-binding-component-with-jcraft-tf3953802s12049.html#a11541824 Sent from the ServiceMix - User mailing list archive at Nabble.com.
