This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9a702e5 HDDS-3979. Make bufferSize configurable for stream copy
(#1212)
9a702e5 is described below
commit 9a702e5c35cfdffe7f0f1a4e2cc7ec97ec8e6d4e
Author: maobaolong <[email protected]>
AuthorDate: Tue Aug 11 20:20:17 2020 +0800
HDDS-3979. Make bufferSize configurable for stream copy (#1212)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 1 +
.../common/src/main/resources/ozone-default.xml | 8 ++
.../hadoop/ozone/client/io/KeyInputStream.java | 62 ++---------
.../ozone/client/rpc/TestKeyInputStream.java | 119 ++++++++++-----------
.../hadoop/ozone/s3/S3GatewayConfigKeys.java | 6 ++
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 36 ++++---
.../hadoop/ozone/s3/io/S3WrapperInputStream.java | 36 +------
7 files changed, 107 insertions(+), 161 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index d89fef9..482ac88 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -468,6 +468,7 @@ public final class OzoneConfigKeys {
public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY =
"ozone.https.client.need-auth";
public static final boolean OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+
/**
* There is no need to instantiate this class.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b9774aa..5770448 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2432,6 +2432,14 @@
</description>
</property>
<property>
+ <name>ozone.s3g.client.buffer.size</name>
+ <tag>OZONE, S3GATEWAY</tag>
+ <value>4KB</value>
+ <description>
+ The size of the buffer which is for read block. (4KB by default).
+ </description>
+ </property>
+ <property>
<name>ssl.server.keystore.keypassword</name>
<tag>OZONE, SECURITY, MANAGEMENT</tag>
<value></value>
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 4af6838..769035a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -325,62 +323,14 @@ public class KeyInputStream extends InputStream
implements Seekable {
return blockStreams.get(index).getRemaining();
}
- /**
- * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
- * to an <code>OutputStream</code>, optionally skipping input bytes.
- * <p>
- * Copy the method from IOUtils of commons-io to reimplement skip by seek
- * rather than read. The reason why IOUtils of commons-io implement skip
- * by read can be found at
- * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
- * </p>
- * <p>
- * This method uses the provided buffer, so there is no need to use a
- * <code>BufferedInputStream</code>.
- * </p>
- *
- * @param output the <code>OutputStream</code> to write to
- * @param inputOffset : number of bytes to skip from input before copying
- * -ve values are ignored
- * @param length : number of bytes to copy. -ve means all
- * @param buffer the buffer to use for the copy
- * @return the number of bytes copied
- * @throws NullPointerException if the input or output is null
- * @throws IOException if an I/O error occurs
- */
- public long copyLarge(final OutputStream output,
- final long inputOffset, final long len, final byte[] buffer)
- throws IOException {
- if (inputOffset > 0) {
- seek(inputOffset);
- }
-
- if (len == 0) {
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
return 0;
}
- final int bufferLength = buffer.length;
- int bytesToRead = bufferLength;
- if (len > 0 && len < bufferLength) {
- bytesToRead = (int) len;
- }
-
- int read;
- long totalRead = 0;
- while (bytesToRead > 0) {
- read = read(buffer, 0, bytesToRead);
- if (read == IOUtils.EOF) {
- break;
- }
-
- output.write(buffer, 0, read);
- totalRead += read;
- if (len > 0) { // only adjust len if not reading to the end
- // Note the cast must work because buffer.length is an integer
- bytesToRead = (int) Math.min(len - totalRead, bufferLength);
- }
- }
-
- return totalRead;
+ long toSkip = Math.min(n, length - getPos());
+ seek(getPos() + toSkip);
+ return toSkip;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 8ab176d..7775bb7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -43,7 +43,6 @@ import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -295,66 +294,6 @@ public class TestKeyInputStream {
}
@Test
- public void testCopyLarge() throws Exception {
- String keyName = getKeyName();
- OzoneOutputStream key = TestHelper.createKey(keyName,
- ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
-
- // write data spanning 3 blocks
- int dataLength = (2 * blockSize) + (blockSize / 2);
-
- byte[] inputData = new byte[dataLength];
- Random rand = new Random();
- for (int i = 0; i < dataLength; i++) {
- inputData[i] = (byte) rand.nextInt(127);
- }
- key.write(inputData);
- key.close();
-
- // test with random start and random length
- for (int i = 0; i < 100; i++) {
- int inputOffset = rand.nextInt(dataLength - 1);
- int length = rand.nextInt(dataLength - inputOffset);
-
- KeyInputStream keyInputStream = (KeyInputStream) objectStore
- .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
- .getInputStream();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- keyInputStream.copyLarge(outputStream, inputOffset, length,
- new byte[4096]);
- byte[] readData = outputStream.toByteArray();
- keyInputStream.close();
- outputStream.close();
-
- for (int j = inputOffset; j < inputOffset + length; j++) {
- Assert.assertEquals(readData[j - inputOffset], inputData[j]);
- }
- }
-
- // test with random start and -ve length
- for (int i = 0; i < 10; i++) {
- int inputOffset = rand.nextInt(dataLength - 1);
- int length = -1;
-
- KeyInputStream keyInputStream = (KeyInputStream) objectStore
- .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
- .getInputStream();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- keyInputStream.copyLarge(outputStream, inputOffset, length,
- new byte[4096]);
- byte[] readData = outputStream.toByteArray();
- keyInputStream.close();
- outputStream.close();
-
- for (int j = inputOffset; j < dataLength; j++) {
- Assert.assertEquals(readData[j - inputOffset], inputData[j]);
- }
- }
- }
-
- @Test
public void testReadChunk() throws Exception {
String keyName = getKeyName();
OzoneOutputStream key = TestHelper.createKey(keyName,
@@ -395,4 +334,62 @@ public class TestKeyInputStream {
}
keyInputStream.close();
}
+
+ @Test
+ public void testSkip() throws Exception {
+ XceiverClientManager.resetXceiverClientMetrics();
+ XceiverClientMetrics metrics = XceiverClientManager
+ .getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long readChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.ReadChunk);
+
+ String keyName = getKeyName();
+ OzoneOutputStream key = TestHelper.createKey(keyName,
+ ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+ // write data spanning 3 chunks
+ int dataLength = (2 * chunkSize) + (chunkSize / 2);
+ byte[] inputData = ContainerTestHelper.getFixedLengthString(
+ keyString, dataLength).getBytes(UTF_8);
+ key.write(inputData);
+ key.close();
+
+ Assert.assertEquals(writeChunkCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+
+ // skip 150
+ keyInputStream.skip(70);
+ Assert.assertEquals(70, keyInputStream.getPos());
+ keyInputStream.skip(0);
+ Assert.assertEquals(70, keyInputStream.getPos());
+ keyInputStream.skip(80);
+
+ Assert.assertEquals(150, keyInputStream.getPos());
+
+ // Skip operation should not result in any readChunk operation.
+ Assert.assertEquals(readChunkCount, metrics
+ .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+ byte[] readData = new byte[chunkSize];
+ keyInputStream.read(readData, 0, chunkSize);
+
+ // Since we reading data from index 150 to 250 and the chunk boundary is
+ // 100 bytes, we need to read 2 chunks.
+ Assert.assertEquals(readChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+ keyInputStream.close();
+
+ // Verify that the data read matches with the input data at corresponding
+ // indices.
+ for (int i = 0; i < chunkSize; i++) {
+ Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+ }
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
index fae1c82..5acf368 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
@@ -52,6 +52,12 @@ public final class S3GatewayConfigKeys {
OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.keytab";
public static final String OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.principal";
+
+ public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
+ "ozone.s3g.client.buffer.size";
+ public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
+ "4KB";
+
/**
* Never constructed.
*/
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 6f0ea57..5502173 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.s3.endpoint;
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -49,6 +51,8 @@ import java.util.Map;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
@@ -77,6 +81,9 @@ import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
+
+import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
@@ -104,6 +111,7 @@ public class ObjectEndpoint extends EndpointBase {
private HttpHeaders headers;
private List<String> customizableGetHeaders = new ArrayList<>();
+ private int bufferSize;
public ObjectEndpoint() {
customizableGetHeaders.add("Content-Type");
@@ -114,6 +122,16 @@ public class ObjectEndpoint extends EndpointBase {
customizableGetHeaders.add("Content-Encoding");
}
+ @Inject
+ private OzoneConfiguration ozoneConfiguration;
+
+ @PostConstruct
+ public void init() {
+ bufferSize = (int) ozoneConfiguration.getStorageSize(
+ OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
+ OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
+ }
+
/**
* Rest endpoint to upload object to a bucket.
* <p>
@@ -259,7 +277,8 @@ public class ObjectEndpoint extends EndpointBase {
try (S3WrapperInputStream s3WrapperInputStream =
new S3WrapperInputStream(
key.getInputStream())) {
- s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
+ IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
+ copyLength, new byte[bufferSize]);
}
};
responseBuilder = Response
@@ -400,7 +419,6 @@ public class ObjectEndpoint extends EndpointBase {
return Response
.status(Status.NO_CONTENT)
.build();
-
}
/**
@@ -539,16 +557,9 @@ public class ObjectEndpoint extends EndpointBase {
if (range != null) {
RangeHeader rangeHeader =
RangeHeaderParserUtil.parseRangeHeader(range, 0);
-
- long copyLength = rangeHeader.getEndOffset() -
- rangeHeader.getStartOffset();
-
- try (S3WrapperInputStream s3WrapperInputStream =
- new S3WrapperInputStream(
- sourceObject.getInputStream())) {
- s3WrapperInputStream.copyLarge(ozoneOutputStream,
- rangeHeader.getStartOffset(), copyLength);
- }
+ IOUtils.copyLarge(sourceObject, ozoneOutputStream,
+ rangeHeader.getStartOffset(),
+ rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
} else {
IOUtils.copy(sourceObject, ozoneOutputStream);
}
@@ -578,7 +589,6 @@ public class ObjectEndpoint extends EndpointBase {
}
throw ex;
}
-
}
/**
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
index edf90ed..d88287c 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
@@ -23,14 +23,12 @@ import org.apache.hadoop.ozone.client.io.KeyInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
/**
* S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
*/
public class S3WrapperInputStream extends FSInputStream {
private final KeyInputStream inputStream;
- private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
/**
* Constructs S3WrapperInputStream with KeyInputStream.
@@ -75,36 +73,12 @@ public class S3WrapperInputStream extends FSInputStream {
}
@Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
+ public long skip(long n) throws IOException {
+ return inputStream.skip(n);
}
- /**
- * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
- * to an <code>OutputStream</code>, optionally skipping input bytes.
- * <p>
- * Copy the method from IOUtils of commons-io to reimplement skip by seek
- * rather than read. The reason why IOUtils of commons-io implement skip
- * by read can be found at
- * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>.
- * </p>
- * <p>
- * This method buffers the input internally, so there is no need to use a
- * <code>BufferedInputStream</code>.
- * </p>
- * The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}.
- *
- * @param output the <code>OutputStream</code> to write to
- * @param inputOffset : number of bytes to skip from input before copying
- * -ve values are ignored
- * @param length : number of bytes to copy. -ve means all
- * @return the number of bytes copied
- * @throws NullPointerException if the input or output is null
- * @throws IOException if an I/O error occurs
- */
- public long copyLarge(final OutputStream output, final long inputOffset,
- final long length) throws IOException {
- return inputStream.copyLarge(output, inputOffset, length,
- new byte[DEFAULT_BUFFER_SIZE]);
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]