steveloughran commented on a change in pull request #1999:
URL: https://github.com/apache/hadoop/pull/1999#discussion_r427437660
##
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.sftp;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.UserAuth;
+import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+
+public class SFTPContract extends AbstractFSContract {
+
+ private String testDataDir = new FileSystemTestHelper().getTestRootDir();
+ private Configuration conf;
+ public static final String CONTRACT_XML = "contract/sftp.xml";
+ private SshServer sshd;
+
+ public SFTPContract(Configuration conf) {
+super(conf);
+addConfResource(CONTRACT_XML);
+this.conf = conf;
+ }
+
+ @Override
+ public void init() throws IOException {
+sshd = SshServer.setUpDefaultServer();
+// ask OS to assign a port
+sshd.setPort(0);
+sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+
+List> userAuthFactories = new ArrayList<>();
+userAuthFactories.add(new UserAuthPasswordFactory());
+
+sshd.setUserAuthFactories(userAuthFactories);
+sshd.setPasswordAuthenticator((username, password, session) ->
+username.equals("user") && password.equals("password")
+);
+
+sshd.setSubsystemFactories(
+Collections.singletonList(new SftpSubsystemFactory()));
+
+sshd.start();
+int port = sshd.getPort();
+
+conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
Review comment:
shouldn't this be set in core-default.xml already?
if not, sftp:// urls would break. (yes, i know every stack overflow spark
example does this, but that is just superstition)
##
File path:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
##
@@ -15,86 +15,113 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.fs.sftp;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UncheckedIOException;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.SftpATTRS;
+import com.jcraft.jsch.SftpException;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/** SFTP FileSystem input stream. */
class SFTPInputStream extends FSInputStream {
- public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
- public static final String E_NULL_INPUTSTREAM = "Null InputStream";
- public static final String E_STREAM_CLOSED = "Stream closed";
-
+ private final ChannelSftp channel;
+ private final Path path;
private InputStream wrappedStream;
private FileSystem.Statistics stats;
private boolean closed;
private long pos;
-
- SFTPInputStream(InputStream stream, FileSystem.Statistics stats) {
-
-if (stream == null) {
- throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
+ private long nextPos;
+ private long contentLength;
+
+ SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
{
+try {
+ this.channel = channel;
+ this.path = path;
+ this.stats = stats;
+ this.wrappedStream =