Repository: mina-sshd Updated Branches: refs/heads/master fad77cd31 -> eac82acbe
[SSHD-697] Add read/write methods to SftpClient that enable read/write starting from some offset Exposing a FileChannel that can be used to convert into an input/output stream. Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/eac82acb Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/eac82acb Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/eac82acb Branch: refs/heads/master Commit: eac82acbe7bb07b4292c509ec14da3ef068f2ca5 Parents: fad77cd Author: Lyor Goldstein <lyor.goldst...@gmail.com> Authored: Sun Sep 18 18:30:41 2016 +0300 Committer: Lyor Goldstein <lyor.goldst...@gmail.com> Committed: Sun Sep 18 18:30:41 2016 +0300 ---------------------------------------------------------------------- .../subsystem/sftp/AbstractSftpClient.java | 45 -- .../sshd/client/subsystem/sftp/SftpClient.java | 137 +++++- .../client/subsystem/sftp/SftpFileChannel.java | 403 ------------------ .../subsystem/sftp/SftpFileSystemChannel.java | 37 ++ .../subsystem/sftp/SftpFileSystemProvider.java | 40 +- .../subsystem/sftp/SftpRemotePathChannel.java | 413 +++++++++++++++++++ .../subsystem/sftp/SftpFileSystemTest.java | 36 ++ .../sshd/client/subsystem/sftp/SftpTest.java | 40 ++ 8 files changed, 661 insertions(+), 490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java index 848790c..1159151 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java @@ -19,8 +19,6 @@ package org.apache.sshd.client.subsystem.sftp; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.Collection; @@ -1121,47 +1119,4 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme buffer.putLong(length); checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer); } - - @Override - public Iterable<DirEntry> readDir(final String path) throws IOException { - if (!isOpen()) { - throw new IOException("readDir(" + path + ") client is closed"); - } - return new SftpIterableDirEntry(this, path); - } - - @Override - public Iterable<DirEntry> listDir(Handle handle) throws IOException { - if (!isOpen()) { - throw new IOException("listDir(" + handle + ") client is closed"); - } - - return new StfpIterableDirHandle(this, handle); - } - - @Override - public InputStream read(final String path, final int bufferSize, final Collection<OpenMode> mode) throws IOException { - if (bufferSize < MIN_READ_BUFFER_SIZE) { - throw new IllegalArgumentException("Insufficient read buffer size: " + bufferSize + ", min.=" + MIN_READ_BUFFER_SIZE); - } - - if (!isOpen()) { - throw new IOException("read(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed"); - } - - return new SftpInputStreamWithChannel(this, bufferSize, path, mode); - } - - @Override - public OutputStream write(final String path, final int bufferSize, final Collection<OpenMode> mode) throws IOException { - if (bufferSize < MIN_WRITE_BUFFER_SIZE) { - throw new IllegalArgumentException("Insufficient write buffer size: " + bufferSize + ", min.=" + MIN_WRITE_BUFFER_SIZE); - } - - if (!isOpen()) { - throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed"); - } - - return new SftpOutputStreamWithChannel(this, bufferSize, path, mode); - } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java index 82527ab..3fb9b51 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java @@ -23,8 +23,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.channels.Channel; +import java.nio.file.OpenOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.AclEntry; import java.nio.file.attribute.FileTime; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -42,7 +45,6 @@ import org.apache.sshd.common.subsystem.sftp.SftpUniversalOwnerAndGroup; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.BufferUtils; -import org.bouncycastle.util.Arrays; /** * @author <a href="http://mina.apache.org">Apache MINA Project</a> @@ -54,7 +56,62 @@ public interface SftpClient extends SubsystemClient { Append, Create, Truncate, - Exclusive + Exclusive; + + /** + * The {@link Set} of {@link OpenOption}-s supported by {@link #fromOpenOptions(Collection)} + */ + public static final Set<OpenOption> SUPPORTED_OPTIONS = + Collections.unmodifiableSet( + EnumSet.of( + StandardOpenOption.READ, StandardOpenOption.APPEND, + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, + StandardOpenOption.SPARSE)); + + /** + * Converts {@link StandardOpenOption}-s into {@link OpenMode}-s + * + * @param options The original options - ignored if {@code null}/empty + * @return A {@link Set} of the equivalent modes + * @throws IllegalArgumentException If an unsupported option is requested + * @see #SUPPORTED_OPTIONS + */ + public static Set<OpenMode> fromOpenOptions(Collection<? extends OpenOption> options) { + if (GenericUtils.isEmpty(options)) { + return Collections.emptySet(); + } + + Set<OpenMode> modes = EnumSet.noneOf(OpenMode.class); + for (OpenOption option : options) { + if (option == StandardOpenOption.READ) { + modes.add(Read); + } else if (option == StandardOpenOption.APPEND) { + modes.add(Append); + } else if (option == StandardOpenOption.CREATE) { + modes.add(Create); + } else if (option == StandardOpenOption.TRUNCATE_EXISTING) { + modes.add(Truncate); + } else if (option == StandardOpenOption.WRITE) { + modes.add(Write); + } else if (option == StandardOpenOption.CREATE_NEW) { + modes.add(Create); + modes.add(Exclusive); + } else if (option == StandardOpenOption.SPARSE) { + /* + * As per the Javadoc: + * + * The option is ignored when the file system does not + * support the creation of sparse files + */ + continue; + } else { + throw new IllegalArgumentException("Unsupported open option: " + option); + } + } + + return modes; + } } enum CopyMode { @@ -123,7 +180,7 @@ public interface SftpClient extends SubsystemClient { return false; } - return Arrays.areEqual(id, ((Handle) obj).id); + return Arrays.equals(id, ((Handle) obj).id); } @Override @@ -432,6 +489,12 @@ public interface SftpClient extends SubsystemClient { long DEFAULT_CHANNEL_OPEN_TIMEOUT = DEFAULT_WAIT_TIMEOUT; /** + * Default modes for opening a channel if no specific modes specified + */ + Set<OpenMode> DEFAULT_CHANNEL_MODES = + Collections.unmodifiableSet(EnumSet.of(OpenMode.Read, OpenMode.Write)); + + /** * @return The negotiated SFTP protocol version */ int getVersion(); @@ -641,7 +704,13 @@ public interface SftpClient extends SubsystemClient { * only <U>once</U> * @throws IOException If failed to access the directory */ - Iterable<DirEntry> listDir(Handle handle) throws IOException; + default Iterable<DirEntry> listDir(Handle handle) throws IOException { + if (!isOpen()) { + throw new IOException("listDir(" + handle + ") client is closed"); + } + + return new StfpIterableDirHandle(this, handle); + } /** * The effective "normalized" remote path @@ -738,6 +807,34 @@ public interface SftpClient extends SubsystemClient { // High level API // + default SftpRemotePathChannel openRemotePathChannel(String path, OpenOption ... options) throws IOException { + return openRemotePathChannel(path, GenericUtils.isEmpty(options) ? Collections.emptyList() : Arrays.asList(options)); + } + + default SftpRemotePathChannel openRemotePathChannel(String path, Collection<? extends OpenOption> options) throws IOException { + return openRemoteFileChannel(path, OpenMode.fromOpenOptions(options)); + } + + default SftpRemotePathChannel openRemoteFileChannel(String path, OpenMode ... modes) throws IOException { + return openRemoteFileChannel(path, GenericUtils.isEmpty(modes) ? Collections.emptyList() : Arrays.asList(modes)); + } + + /** + * Opens an {@link SftpRemotePathChannel} on the specified remote path + * + * @param path The remote path + * @param modes The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used + * @return The open {@link SftpRemotePathChannel} - <B>Note:</B> do not close this + * owner client instance until the channel is no longer needed since it uses the client + * for providing the channel's functionality. + * @throws IOException If failed to open the channel + * @see java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel) + * @see java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel) + */ + default SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException { + return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes); + } + /** * @param path The remote directory path * @return An {@link Iterable} that can be used to iterate over all the @@ -745,7 +842,13 @@ public interface SftpClient extends SubsystemClient { * @throws IOException If failed to access the remote site * @see #readDir(Handle) */ - Iterable<DirEntry> readDir(String path) throws IOException; + default Iterable<DirEntry> readDir(String path) throws IOException { + if (!isOpen()) { + throw new IOException("readDir(" + path + ") client is closed"); + } + + return new SftpIterableDirEntry(this, path); + } default InputStream read(String path) throws IOException { return read(path, DEFAULT_READ_BUFFER_SIZE); @@ -776,7 +879,17 @@ public interface SftpClient extends SubsystemClient { * @return An {@link InputStream} for reading the remote file data * @throws IOException If failed to execute */ - InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException; + default InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException { + if (bufferSize < MIN_READ_BUFFER_SIZE) { + throw new IllegalArgumentException("Insufficient read buffer size: " + bufferSize + ", min.=" + MIN_READ_BUFFER_SIZE); + } + + if (!isOpen()) { + throw new IOException("read(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed"); + } + + return new SftpInputStreamWithChannel(this, bufferSize, path, mode); + } default OutputStream write(String path) throws IOException { return write(path, DEFAULT_WRITE_BUFFER_SIZE); @@ -807,7 +920,17 @@ public interface SftpClient extends SubsystemClient { * @return An {@link OutputStream} for writing the data * @throws IOException If failed to execute */ - OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException; + default OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException { + if (bufferSize < MIN_WRITE_BUFFER_SIZE) { + throw new IllegalArgumentException("Insufficient write buffer size: " + bufferSize + ", min.=" + MIN_WRITE_BUFFER_SIZE); + } + + if (!isOpen()) { + throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed"); + } + + return new SftpOutputStreamWithChannel(this, bufferSize, path, mode); + } /** * @param <E> The generic extension type http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java deleted file mode 100644 index 941ee23..0000000 --- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sshd.client.subsystem.sftp; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.sshd.common.PropertyResolverUtils; -import org.apache.sshd.common.subsystem.sftp.SftpConstants; -import org.apache.sshd.common.subsystem.sftp.SftpException; -import org.apache.sshd.common.util.GenericUtils; -import org.apache.sshd.common.util.ValidateUtils; -import org.apache.sshd.common.util.io.IoUtils; - -public class SftpFileChannel extends FileChannel { - public static final String COPY_BUFSIZE_PROP = "sftp-channel-copy-buf-size"; - public static final int DEFAULT_TRANSFER_BUFFER_SIZE = IoUtils.DEFAULT_COPY_SIZE; - - public static final Set<SftpClient.OpenMode> READ_MODES = - Collections.unmodifiableSet(EnumSet.of(SftpClient.OpenMode.Read)); - - public static final Set<SftpClient.OpenMode> WRITE_MODES = - Collections.unmodifiableSet( - EnumSet.of(SftpClient.OpenMode.Write, SftpClient.OpenMode.Append, SftpClient.OpenMode.Create, SftpClient.OpenMode.Truncate)); - - private final SftpPath p; - private final Collection<SftpClient.OpenMode> modes; - private final SftpClient sftp; - private final SftpClient.CloseableHandle handle; - private final Object lock = new Object(); - private final AtomicLong posTracker = new AtomicLong(0L); - private final AtomicReference<Thread> blockingThreadHolder = new AtomicReference<>(null); - - public SftpFileChannel(SftpPath p, Collection<SftpClient.OpenMode> modes) throws IOException { - this.p = ValidateUtils.checkNotNull(p, "No target path"); - this.modes = ValidateUtils.checkNotNull(modes, "No channel modes specified"); - - SftpFileSystem fs = p.getFileSystem(); - sftp = fs.getClient(); - handle = sftp.open(p.toString(), modes); - } - - @Override - public int read(ByteBuffer dst) throws IOException { - return (int) doRead(Collections.singletonList(dst), -1); - } - - @Override - public int read(ByteBuffer dst, long position) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("read(" + p + ") illegal position to read from: " + position); - } - return (int) doRead(Collections.singletonList(dst), position); - } - - @Override - public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { - List<ByteBuffer> buffers = Arrays.asList(dsts).subList(offset, offset + length); - return doRead(buffers, -1); - } - - protected long doRead(List<ByteBuffer> buffers, long position) throws IOException { - ensureOpen(READ_MODES); - synchronized (lock) { - boolean completed = false; - boolean eof = false; - long curPos = (position >= 0L) ? position : posTracker.get(); - try { - long totalRead = 0; - beginBlocking(); - loop: - for (ByteBuffer buffer : buffers) { - while (buffer.remaining() > 0) { - ByteBuffer wrap = buffer; - if (!buffer.hasArray()) { - wrap = ByteBuffer.allocate(Math.min(8192, buffer.remaining())); - } - int read = sftp.read(handle, curPos, wrap.array(), wrap.arrayOffset() + wrap.position(), wrap.remaining()); - if (read > 0) { - if (wrap == buffer) { - wrap.position(wrap.position() + read); - } else { - buffer.put(wrap.array(), wrap.arrayOffset(), read); - } - curPos += read; - totalRead += read; - } else { - eof = read == -1; - break loop; - } - } - } - completed = true; - if (totalRead > 0) { - return totalRead; - } - - if (eof) { - return -1; - } else { - return 0; - } - } finally { - if (position < 0L) { - posTracker.set(curPos); - } - endBlocking(completed); - } - } - } - - @Override - public int write(ByteBuffer src) throws IOException { - return (int) doWrite(Collections.singletonList(src), -1); - } - - @Override - public int write(ByteBuffer src, long position) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("write(" + p + ") illegal position to write to: " + position); - } - return (int) doWrite(Collections.singletonList(src), position); - } - - @Override - public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { - List<ByteBuffer> buffers = Arrays.asList(srcs).subList(offset, offset + length); - return doWrite(buffers, -1); - } - - protected long doWrite(List<ByteBuffer> buffers, long position) throws IOException { - ensureOpen(WRITE_MODES); - synchronized (lock) { - boolean completed = false; - long curPos = (position >= 0L) ? position : posTracker.get(); - try { - long totalWritten = 0L; - beginBlocking(); - for (ByteBuffer buffer : buffers) { - while (buffer.remaining() > 0) { - ByteBuffer wrap = buffer; - if (!buffer.hasArray()) { - wrap = ByteBuffer.allocate(Math.min(8192, buffer.remaining())); - buffer.get(wrap.array(), wrap.arrayOffset(), wrap.remaining()); - } - int written = wrap.remaining(); - sftp.write(handle, curPos, wrap.array(), wrap.arrayOffset() + wrap.position(), written); - if (wrap == buffer) { - wrap.position(wrap.position() + written); - } - curPos += written; - totalWritten += written; - } - } - completed = true; - return totalWritten; - } finally { - if (position < 0L) { - posTracker.set(curPos); - } - endBlocking(completed); - } - } - } - - @Override - public long position() throws IOException { - ensureOpen(Collections.emptySet()); - return posTracker.get(); - } - - @Override - public FileChannel position(long newPosition) throws IOException { - if (newPosition < 0L) { - throw new IllegalArgumentException("position(" + p + ") illegal file channel position: " + newPosition); - } - - ensureOpen(Collections.emptySet()); - posTracker.set(newPosition); - return this; - } - - @Override - public long size() throws IOException { - ensureOpen(Collections.emptySet()); - return sftp.stat(handle).getSize(); - } - - @Override - public FileChannel truncate(long size) throws IOException { - ensureOpen(Collections.emptySet()); - sftp.setStat(handle, new SftpClient.Attributes().size(size)); - return this; - } - - @Override - public void force(boolean metaData) throws IOException { - ensureOpen(Collections.emptySet()); - } - - @Override - public long transferTo(long position, long count, WritableByteChannel target) throws IOException { - if ((position < 0) || (count < 0)) { - throw new IllegalArgumentException("transferTo(" + p + ") illegal position (" + position + ") or count (" + count + ")"); - } - ensureOpen(READ_MODES); - synchronized (lock) { - boolean completed = false; - boolean eof = false; - long curPos = position; - try { - beginBlocking(); - - int bufSize = (int) Math.min(count, 32768); - byte[] buffer = new byte[bufSize]; - long totalRead = 0L; - while (totalRead < count) { - int read = sftp.read(handle, curPos, buffer, 0, buffer.length); - if (read > 0) { - ByteBuffer wrap = ByteBuffer.wrap(buffer); - while (wrap.remaining() > 0) { - target.write(wrap); - } - curPos += read; - totalRead += read; - } else { - eof = read == -1; - } - } - completed = true; - return totalRead > 0 ? totalRead : eof ? -1 : 0; - } finally { - endBlocking(completed); - } - } - } - - @Override - public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException { - if ((position < 0) || (count < 0)) { - throw new IllegalArgumentException("transferFrom(" + p + ") illegal position (" + position + ") or count (" + count + ")"); - } - ensureOpen(WRITE_MODES); - - int copySize = PropertyResolverUtils.getIntProperty(sftp.getClientSession(), COPY_BUFSIZE_PROP, DEFAULT_TRANSFER_BUFFER_SIZE); - boolean completed = false; - long curPos = (position >= 0L) ? position : posTracker.get(); - long totalRead = 0L; - byte[] buffer = new byte[(int) Math.min(copySize, count)]; - - synchronized (lock) { - try { - beginBlocking(); - - while (totalRead < count) { - ByteBuffer wrap = ByteBuffer.wrap(buffer, 0, (int) Math.min(buffer.length, count - totalRead)); - int read = src.read(wrap); - if (read > 0) { - sftp.write(handle, curPos, buffer, 0, read); - curPos += read; - totalRead += read; - } else { - break; - } - } - completed = true; - return totalRead; - } finally { - endBlocking(completed); - } - } - } - - @Override - public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { - throw new UnsupportedOperationException("map(" + p + ")[" + mode + "," + position + "," + size + "] N/A"); - } - - @Override - public FileLock lock(long position, long size, boolean shared) throws IOException { - return tryLock(position, size, shared); - } - - @Override - public FileLock tryLock(final long position, final long size, boolean shared) throws IOException { - ensureOpen(Collections.emptySet()); - - try { - sftp.lock(handle, position, size, 0); - } catch (SftpException e) { - if (e.getStatus() == SftpConstants.SSH_FX_LOCK_CONFLICT) { - throw new OverlappingFileLockException(); - } - throw e; - } - - return new FileLock(this, position, size, shared) { - private final AtomicBoolean valid = new AtomicBoolean(true); - - @Override - public boolean isValid() { - return acquiredBy().isOpen() && valid.get(); - } - - @SuppressWarnings("synthetic-access") - @Override - public void release() throws IOException { - if (valid.compareAndSet(true, false)) { - sftp.unlock(handle, position, size); - } - } - }; - } - - @Override - protected void implCloseChannel() throws IOException { - try { - final Thread thread = blockingThreadHolder.get(); - if (thread != null) { - thread.interrupt(); - } - } finally { - try { - handle.close(); - } finally { - sftp.close(); - } - } - } - - private void beginBlocking() { - begin(); - blockingThreadHolder.set(Thread.currentThread()); - } - - private void endBlocking(boolean completed) throws AsynchronousCloseException { - blockingThreadHolder.set(null); - end(completed); - } - - /** - * Checks that the channel is open and that its current mode contains - * at least one of the required ones - * - * @param reqModes The required modes - ignored if {@code null}/empty - * @throws IOException If channel not open or the required modes are not - * satisfied - */ - private void ensureOpen(Collection<SftpClient.OpenMode> reqModes) throws IOException { - if (!isOpen()) { - throw new ClosedChannelException(); - } - - if (GenericUtils.size(reqModes) > 0) { - for (SftpClient.OpenMode m : reqModes) { - if (this.modes.contains(m)) { - return; - } - } - - throw new IOException("ensureOpen(" + p + ") current channel modes (" + this.modes + ") do contain any of the required: " + reqModes); - } - } - - @Override - public String toString() { - return Objects.toString(p); - } -} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemChannel.java new file mode 100644 index 0000000..40948bf --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.client.subsystem.sftp; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +/** + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class SftpFileSystemChannel extends SftpRemotePathChannel { + public SftpFileSystemChannel(SftpPath p, Collection<SftpClient.OpenMode> modes) throws IOException { + this(Objects.requireNonNull(p, "No target path").toString(), p.getFileSystem(), modes); + } + + public SftpFileSystemChannel(String remotePath, SftpFileSystem fs, Collection<SftpClient.OpenMode> modes) throws IOException { + super(remotePath, Objects.requireNonNull(fs, "No SFTP file system").getClient(), true, modes); + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemProvider.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemProvider.java index 3ecd93c..90019c2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemProvider.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemProvider.java @@ -25,7 +25,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.AccessMode; import java.nio.file.CopyOption; @@ -42,7 +41,6 @@ import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.ProviderMismatchException; import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; import java.nio.file.attribute.AclEntry; import java.nio.file.attribute.AclFileAttributeView; import java.nio.file.attribute.BasicFileAttributeView; @@ -400,46 +398,18 @@ public class SftpFileSystemProvider extends FileSystemProvider { } @Override - public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException { + public FileChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException { return newFileChannel(path, options, attrs); } @Override public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException { - Collection<SftpClient.OpenMode> modes = EnumSet.noneOf(SftpClient.OpenMode.class); - for (OpenOption option : options) { - if (option == StandardOpenOption.READ) { - modes.add(SftpClient.OpenMode.Read); - } else if (option == StandardOpenOption.APPEND) { - modes.add(SftpClient.OpenMode.Append); - } else if (option == StandardOpenOption.CREATE) { - modes.add(SftpClient.OpenMode.Create); - } else if (option == StandardOpenOption.TRUNCATE_EXISTING) { - modes.add(SftpClient.OpenMode.Truncate); - } else if (option == StandardOpenOption.WRITE) { - modes.add(SftpClient.OpenMode.Write); - } else if (option == StandardOpenOption.CREATE_NEW) { - modes.add(SftpClient.OpenMode.Create); - modes.add(SftpClient.OpenMode.Exclusive); - } else if (option == StandardOpenOption.SPARSE) { - /* - * As per the Javadoc: - * - * The option is ignored when the file system does not - * support the creation of sparse files - */ - //noinspection UnnecessaryContinue - continue; - } else { - throw new IllegalArgumentException("newFileChannel(" + path + ") unsupported open option: " + option); - } - } + Collection<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(options); if (modes.isEmpty()) { - modes.add(SftpClient.OpenMode.Read); - modes.add(SftpClient.OpenMode.Write); + modes = EnumSet.of(SftpClient.OpenMode.Read, SftpClient.OpenMode.Write); } - // TODO: attrs - return new SftpFileChannel(toSftpPath(path), modes); + // TODO: process file attributes + return new SftpFileSystemChannel(toSftpPath(path), modes); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java new file mode 100644 index 0000000..a3fb210 --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.client.subsystem.sftp; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.sshd.common.PropertyResolverUtils; +import org.apache.sshd.common.subsystem.sftp.SftpConstants; +import org.apache.sshd.common.subsystem.sftp.SftpException; +import org.apache.sshd.common.util.GenericUtils; +import org.apache.sshd.common.util.ValidateUtils; +import org.apache.sshd.common.util.io.IoUtils; + +/** + * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> + */ +public class SftpRemotePathChannel extends FileChannel { + public static final String COPY_BUFSIZE_PROP = "sftp-channel-copy-buf-size"; + public static final int DEFAULT_TRANSFER_BUFFER_SIZE = IoUtils.DEFAULT_COPY_SIZE; + + public static final Set<SftpClient.OpenMode> READ_MODES = + Collections.unmodifiableSet(EnumSet.of(SftpClient.OpenMode.Read)); + + public static final Set<SftpClient.OpenMode> WRITE_MODES = + Collections.unmodifiableSet( + EnumSet.of(SftpClient.OpenMode.Write, SftpClient.OpenMode.Append, SftpClient.OpenMode.Create, SftpClient.OpenMode.Truncate)); + + private final String path; + private final Collection<SftpClient.OpenMode> modes; + private final boolean closeOnExit; + private final SftpClient sftp; + private final SftpClient.CloseableHandle handle; + private final Object lock = new Object(); + private final AtomicLong posTracker = new AtomicLong(0L); + private final AtomicReference<Thread> blockingThreadHolder = new AtomicReference<>(null); + + public SftpRemotePathChannel(String path, SftpClient sftp, boolean closeOnExit, Collection<SftpClient.OpenMode> modes) throws IOException { + this.path = ValidateUtils.checkNotNullAndNotEmpty(path, "No remote file path specified"); + this.modes = Objects.requireNonNull(modes, "No channel modes specified"); + this.sftp = Objects.requireNonNull(sftp, "No SFTP client instance"); + this.closeOnExit = closeOnExit; + this.handle = sftp.open(path, modes); + } + + public String getRemotePath() { + return path; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return (int) doRead(Collections.singletonList(dst), -1); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + if (position < 0) { + throw new IllegalArgumentException("read(" + getRemotePath() + ") illegal position to read from: " + position); + } + return (int) doRead(Collections.singletonList(dst), position); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + List<ByteBuffer> buffers = Arrays.asList(dsts).subList(offset, offset + length); + return doRead(buffers, -1); + } + + protected long doRead(List<ByteBuffer> buffers, long position) throws IOException { + ensureOpen(READ_MODES); + synchronized (lock) { + boolean completed = false; + boolean eof = false; + long curPos = (position >= 0L) ? position : posTracker.get(); + try { + long totalRead = 0; + beginBlocking(); + loop: + for (ByteBuffer buffer : buffers) { + while (buffer.remaining() > 0) { + ByteBuffer wrap = buffer; + if (!buffer.hasArray()) { + wrap = ByteBuffer.allocate(Math.min(IoUtils.DEFAULT_COPY_SIZE, buffer.remaining())); + } + int read = sftp.read(handle, curPos, wrap.array(), wrap.arrayOffset() + wrap.position(), wrap.remaining()); + if (read > 0) { + if (wrap == buffer) { + wrap.position(wrap.position() + read); + } else { + buffer.put(wrap.array(), wrap.arrayOffset(), read); + } + curPos += read; + totalRead += read; + } else { + eof = read == -1; + break loop; + } + } + } + completed = true; + if (totalRead > 0) { + return totalRead; + } + + if (eof) { + return -1; + } else { + return 0; + } + } finally { + if (position < 0L) { + posTracker.set(curPos); + } + endBlocking(completed); + } + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + return (int) doWrite(Collections.singletonList(src), -1); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + if (position < 0L) { + throw new IllegalArgumentException("write(" + getRemotePath() + ") illegal position to write to: " + position); + } + return (int) doWrite(Collections.singletonList(src), position); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + List<ByteBuffer> buffers = Arrays.asList(srcs).subList(offset, offset + length); + return doWrite(buffers, -1); + } + + protected long doWrite(List<ByteBuffer> buffers, long position) throws IOException { + ensureOpen(WRITE_MODES); + synchronized (lock) { + boolean completed = false; + long curPos = (position >= 0L) ? position : posTracker.get(); + try { + long totalWritten = 0L; + beginBlocking(); + for (ByteBuffer buffer : buffers) { + while (buffer.remaining() > 0) { + ByteBuffer wrap = buffer; + if (!buffer.hasArray()) { + wrap = ByteBuffer.allocate(Math.min(IoUtils.DEFAULT_COPY_SIZE, buffer.remaining())); + buffer.get(wrap.array(), wrap.arrayOffset(), wrap.remaining()); + } + int written = wrap.remaining(); + sftp.write(handle, curPos, wrap.array(), wrap.arrayOffset() + wrap.position(), written); + if (wrap == buffer) { + wrap.position(wrap.position() + written); + } + curPos += written; + totalWritten += written; + } + } + completed = true; + return totalWritten; + } finally { + if (position < 0L) { + posTracker.set(curPos); + } + endBlocking(completed); + } + } + } + + @Override + public long position() throws IOException { + ensureOpen(Collections.emptySet()); + return posTracker.get(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + if (newPosition < 0L) { + throw new IllegalArgumentException("position(" + getRemotePath() + ") illegal file channel position: " + newPosition); + } + + ensureOpen(Collections.emptySet()); + posTracker.set(newPosition); + return this; + } + + @Override + public long size() throws IOException { + ensureOpen(Collections.emptySet()); + return sftp.stat(handle).getSize(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + ensureOpen(Collections.emptySet()); + sftp.setStat(handle, new SftpClient.Attributes().size(size)); + return this; + } + + @Override + public void force(boolean metaData) throws IOException { + ensureOpen(Collections.emptySet()); + } + + @Override + public long transferTo(long position, long count, WritableByteChannel target) throws IOException { + if ((position < 0) || (count < 0)) { + throw new IllegalArgumentException("transferTo(" + getRemotePath() + ") illegal position (" + position + ") or count (" + count + ")"); + } + ensureOpen(READ_MODES); + synchronized (lock) { + boolean completed = false; + boolean eof = false; + long curPos = position; + try { + beginBlocking(); + + int bufSize = (int) Math.min(count, Short.MAX_VALUE + 1); + byte[] buffer = new byte[bufSize]; + long totalRead = 0L; + while (totalRead < count) { + int read = sftp.read(handle, curPos, buffer, 0, buffer.length); + if (read > 0) { + ByteBuffer wrap = ByteBuffer.wrap(buffer); + while (wrap.remaining() > 0) { + target.write(wrap); + } + curPos += read; + totalRead += read; + } else { + eof = read == -1; + } + } + completed = true; + return totalRead > 0 ? totalRead : eof ? -1 : 0; + } finally { + endBlocking(completed); + } + } + } + + @Override + public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException { + if ((position < 0) || (count < 0)) { + throw new IllegalArgumentException("transferFrom(" + getRemotePath() + ") illegal position (" + position + ") or count (" + count + ")"); + } + ensureOpen(WRITE_MODES); + + int copySize = PropertyResolverUtils.getIntProperty(sftp.getClientSession(), COPY_BUFSIZE_PROP, DEFAULT_TRANSFER_BUFFER_SIZE); + boolean completed = false; + long curPos = (position >= 0L) ? position : posTracker.get(); + long totalRead = 0L; + byte[] buffer = new byte[(int) Math.min(copySize, count)]; + + synchronized (lock) { + try { + beginBlocking(); + + while (totalRead < count) { + ByteBuffer wrap = ByteBuffer.wrap(buffer, 0, (int) Math.min(buffer.length, count - totalRead)); + int read = src.read(wrap); + if (read > 0) { + sftp.write(handle, curPos, buffer, 0, read); + curPos += read; + totalRead += read; + } else { + break; + } + } + completed = true; + return totalRead; + } finally { + endBlocking(completed); + } + } + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + throw new UnsupportedOperationException("map(" + getRemotePath() + ")[" + mode + "," + position + "," + size + "] N/A"); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + return tryLock(position, size, shared); + } + + @Override + public FileLock tryLock(final long position, final long size, boolean shared) throws IOException { + ensureOpen(Collections.emptySet()); + + try { + sftp.lock(handle, position, size, 0); + } catch (SftpException e) { + if (e.getStatus() == SftpConstants.SSH_FX_LOCK_CONFLICT) { + throw new OverlappingFileLockException(); + } + throw e; + } + + return new FileLock(this, position, size, shared) { + private final AtomicBoolean valid = new AtomicBoolean(true); + + @Override + public boolean isValid() { + return acquiredBy().isOpen() && valid.get(); + } + + @SuppressWarnings("synthetic-access") + @Override + public void release() throws IOException { + if (valid.compareAndSet(true, false)) { + sftp.unlock(handle, position, size); + } + } + }; + } + + @Override + protected void implCloseChannel() throws IOException { + try { + final Thread thread = blockingThreadHolder.get(); + if (thread != null) { + thread.interrupt(); + } + } finally { + try { + handle.close(); + } finally { + if (closeOnExit) { + sftp.close(); + } + } + } + } + + private void beginBlocking() { + begin(); + blockingThreadHolder.set(Thread.currentThread()); + } + + private void endBlocking(boolean completed) throws AsynchronousCloseException { + blockingThreadHolder.set(null); + end(completed); + } + + /** + * Checks that the channel is open and that its current mode contains + * at least one of the required ones + * + * @param reqModes The required modes - ignored if {@code null}/empty + * @throws IOException If channel not open or the required modes are not + * satisfied + */ + private void ensureOpen(Collection<SftpClient.OpenMode> reqModes) throws IOException { + if (!isOpen()) { + throw new ClosedChannelException(); + } + + if (GenericUtils.size(reqModes) > 0) { + for (SftpClient.OpenMode m : reqModes) { + if (this.modes.contains(m)) { + return; + } + } + + throw new IOException("ensureOpen(" + getRemotePath() + ") current channel modes (" + this.modes + ") do contain any of the required: " + reqModes); + } + } + + @Override + public String toString() { + return getRemotePath(); + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemTest.java b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemTest.java index d97ec36..9fb6179 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystemTest.java @@ -20,6 +20,7 @@ package org.apache.sshd.client.subsystem.sftp; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; @@ -34,6 +35,7 @@ import java.nio.file.LinkOption; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.AclEntry; import java.nio.file.attribute.AclFileAttributeView; import java.nio.file.attribute.FileAttributeView; @@ -42,8 +44,11 @@ import java.nio.file.attribute.GroupPrincipal; import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.UserPrincipalLookupService; import java.nio.file.attribute.UserPrincipalNotFoundException; +import java.nio.file.spi.FileSystemProvider; import java.util.Collection; import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -211,6 +216,37 @@ public class SftpFileSystemTest extends BaseTestSupport { } } + @Test // see SSHD-697 + public void testFileChannel() throws IOException { + Path targetPath = detectTargetFolder(); + Path lclSftp = Utils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME, getClass().getSimpleName()); + Path lclFile = lclSftp.resolve(getCurrentTestName() + ".txt"); + Files.deleteIfExists(lclFile); + byte[] expected = (getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")").getBytes(StandardCharsets.UTF_8); + try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) { + Path parentPath = targetPath.getParent(); + String remFilePath = Utils.resolveRelativeRemotePath(parentPath, lclFile); + Path file = fs.getPath(remFilePath); + + FileSystemProvider provider = fs.provider(); + try (FileChannel fc = provider.newFileChannel(file, EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE))) { + int writeLen = fc.write(ByteBuffer.wrap(expected)); + assertEquals("Mismatched written length", expected.length, writeLen); + + FileChannel fcPos = fc.position(0L); + assertSame("Mismatched positioned file channel", fc, fcPos); + + byte[] actual = new byte[expected.length]; + int readLen = fc.read(ByteBuffer.wrap(actual)); + assertEquals("Mismatched read len", writeLen, readLen); + assertArrayEquals("Mismatched read data", expected, actual); + } + } + + byte[] actual = Files.readAllBytes(lclFile); + assertArrayEquals("Mismatched persisted data", expected, actual); + } + @Test public void testFileStore() throws IOException { try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/eac82acb/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java index 79e25ea..58ae35d 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java @@ -27,6 +27,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.SocketTimeoutException; import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.CopyOption; import java.nio.file.FileSystem; @@ -34,9 +36,11 @@ import java.nio.file.Files; import java.nio.file.LinkOption; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.EnumSet; import java.util.LinkedList; import java.util.List; @@ -51,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; + import org.apache.sshd.client.SshClient; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle; @@ -1251,6 +1256,41 @@ public class SftpTest extends AbstractSftpClientTestSupport { assertNotNull("No symlink signalled", linkDataHolder.getAndSet(null)); } + @Test // see SSHD-697 + public void testFileChannel() throws IOException { + Path targetPath = detectTargetFolder(); + Path lclSftp = Utils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME, getClass().getSimpleName()); + Path lclFile = lclSftp.resolve(getCurrentTestName() + ".txt"); + Files.deleteIfExists(lclFile); + byte[] expected = (getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")").getBytes(StandardCharsets.UTF_8); + + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) { + session.addPasswordIdentity(getCurrentTestName()); + session.auth().verify(5L, TimeUnit.SECONDS); + + try (SftpClient sftp = session.createSftpClient()) { + Path parentPath = targetPath.getParent(); + String remFilePath = Utils.resolveRelativeRemotePath(parentPath, lclFile); + + try (FileChannel fc = sftp.openRemotePathChannel(remFilePath, EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE))) { + int writeLen = fc.write(ByteBuffer.wrap(expected)); + assertEquals("Mismatched written length", expected.length, writeLen); + + FileChannel fcPos = fc.position(0L); + assertSame("Mismatched positioned file channel", fc, fcPos); + + byte[] actual = new byte[expected.length]; + int readLen = fc.read(ByteBuffer.wrap(actual)); + assertEquals("Mismatched read len", writeLen, readLen); + assertArrayEquals("Mismatched read data", expected, actual); + } + } + } + + byte[] actual = Files.readAllBytes(lclFile); + assertArrayEquals("Mismatched persisted data", expected, actual); + } + protected String readFile(String path) throws Exception { ChannelSftp c = (ChannelSftp) session.openChannel(SftpConstants.SFTP_SUBSYSTEM_NAME); c.connect();