http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index cf785d5..c23e782 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1,3 +1,4 @@ + <!--- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -852,40 +853,361 @@ Seoul If the wrong endpoint is used, the request may fail. This may be reported as a 301/redirect error, or as a 400 Bad Request. -### S3AFastOutputStream - **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk** - <property> - <name>fs.s3a.fast.upload</name> - <value>false</value> - <description>Upload directly from memory instead of buffering to - disk first. Memory usage and parallelism can be controlled as up to - fs.s3a.multipart.size memory is consumed for each (part)upload actively - uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description> - </property> - <property> - <name>fs.s3a.fast.buffer.size</name> - <value>1048576</value> - <description>Size (in bytes) of initial memory buffer allocated for an - upload. No effect if fs.s3a.fast.upload is false.</description> - </property> +### <a name="s3a_fast_upload"></a>Stabilizing: S3A Fast Upload + + +**New in Hadoop 2.7; significantly enhanced in Hadoop 2.9** + + +Because of the nature of the S3 object store, data written to an S3A `OutputStream` +is not written incrementally âinstead, by default, it is buffered to disk +until the stream is closed in its `close()` method. + +This can make output slow: + +* The execution time for `OutputStream.close()` is proportional to the amount of data +buffered and inversely proportional to the bandwidth. That is `O(data/bandwidth)`. +* The bandwidth is that available from the host to S3: other work in the same +process, server or network at the time of upload may increase the upload time, +hence the duration of the `close()` call. +* If a process uploading data fails before `OutputStream.close()` is called, +all data is lost. +* The disks hosting temporary directories defined in `fs.s3a.buffer.dir` must +have the capacity to store the entire buffered file. + +Put succinctly: the further the process is from the S3 endpoint, or the smaller +the EC-hosted VM is, the longer it will take work to complete. + +This can create problems in application code: + +* Code often assumes that the `close()` call is fast; + the delays can create bottlenecks in operations. +* Very slow uploads sometimes cause applications to time out. (generally, +threads blocking during the upload stop reporting progress, so trigger timeouts) +* Streaming very large amounts of data may consume all disk space before the upload begins. + + +Work to addess this began in Hadoop 2.7 with the `S3AFastOutputStream` +[HADOOP-11183](https://issues.apache.org/jira/browse/HADOOP-11183), and +has continued with ` S3ABlockOutputStream` +[HADOOP-13560](https://issues.apache.org/jira/browse/HADOOP-13560). + + +This adds an alternative output stream, "S3a Fast Upload" which: + +1. Always uploads large files as blocks with the size set by + `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads + begin and the size of each upload are identical. +1. Buffers blocks to disk (default) or in on-heap or off-heap memory. +1. Uploads blocks in parallel in background threads. +1. Begins uploading blocks as soon as the buffered data exceeds this partition + size. +1. When buffering data to disk, uses the directory/directories listed in + `fs.s3a.buffer.dir`. The size of data which can be buffered is limited + to the available disk space. +1. Generates output statistics as metrics on the filesystem, including + statistics of active and pending block uploads. +1. Has the time to `close()` set by the amount of remaning data to upload, rather + than the total size of the file. + +With incremental writes of blocks, "S3A fast upload" offers an upload +time at least as fast as the "classic" mechanism, with significant benefits +on long-lived output streams, and when very large amounts of data are generated. +The in memory buffering mechanims may also offer speedup when running adjacent to +S3 endpoints, as disks are not used for intermediate data storage. + + +```xml +<property> + <name>fs.s3a.fast.upload</name> + <value>true</value> + <description> + Use the incremental block upload mechanism with + the buffering mechanism set in fs.s3a.fast.upload.buffer. + The number of threads performing uploads in the filesystem is defined + by fs.s3a.threads.max; the queue of waiting uploads limited by + fs.s3a.max.total.tasks. + The size of each buffer is set by fs.s3a.multipart.size. + </description> +</property> + +<property> + <name>fs.s3a.fast.upload.buffer</name> + <value>disk</value> + <description> + The buffering mechanism to use when using S3A fast upload + (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer. + This configuration option has no effect if fs.s3a.fast.upload is false. + + "disk" will use the directories listed in fs.s3a.buffer.dir as + the location(s) to save data prior to being uploaded. + + "array" uses arrays in the JVM heap + + "bytebuffer" uses off-heap memory within the JVM. + + Both "array" and "bytebuffer" will consume memory in a single stream up to the number + of blocks set by: + + fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks. + + If using either of these mechanisms, keep this value low + + The total number of threads performing work across all threads is set by + fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued + work items. + </description> +</property> + +<property> + <name>fs.s3a.multipart.size</name> + <value>104857600</value> + <description> + How big (in bytes) to split upload or copy operations up into. + </description> +</property> + +<property> + <name>fs.s3a.fast.upload.active.blocks</name> + <value>8</value> + <description> + Maximum Number of blocks a single output stream can have + active (uploading, or queued to the central FileSystem + instance's pool of queued operations. + + This stops a single stream overloading the shared thread pool. + </description> +</property> +``` + +**Notes** + +* If the amount of data written to a stream is below that set in `fs.s3a.multipart.size`, +the upload is performed in the `OutputStream.close()` operation âas with +the original output stream. + +* The published Hadoop metrics monitor include live queue length and +upload operation counts, so identifying when there is a backlog of work/ +a mismatch between data generation rates and network bandwidth. Per-stream +statistics can also be logged by calling `toString()` on the current stream. + +* Incremental writes are not visible; the object can only be listed +or read when the multipart operation completes in the `close()` call, which +will block until the upload is completed. + + +#### <a name="s3a_fast_upload_disk"></a>Fast Upload with Disk Buffers `fs.s3a.fast.upload.buffer=disk` + +When `fs.s3a.fast.upload.buffer` is set to `disk`, all data is buffered +to local hard disks prior to upload. This minimizes the amount of memory +consumed, and so eliminates heap size as the limiting factor in queued uploads +âexactly as the original "direct to disk" buffering used when +`fs.s3a.fast.upload=false`. + + +```xml +<property> + <name>fs.s3a.fast.upload</name> + <value>true</value> +</property> + +<property> + <name>fs.s3a.fast.upload.buffer</name> + <value>disk</value> +</property> + +``` + + +#### <a name="s3a_fast_upload_bytebuffer"></a>Fast Upload with ByteBuffers: `fs.s3a.fast.upload.buffer=bytebuffer` + +When `fs.s3a.fast.upload.buffer` is set to `bytebuffer`, all data is buffered +in "Direct" ByteBuffers prior to upload. This *may* be faster than buffering to disk, +and, if disk space is small (for example, tiny EC2 VMs), there may not +be much disk space to buffer with. + +The ByteBuffers are created in the memory of the JVM, but not in the Java Heap itself. +The amount of data which can be buffered is +limited by the Java runtime, the operating system, and, for YARN applications, +the amount of memory requested for each container. + +The slower the write bandwidth to S3, the greater the risk of running out +of memory âand so the more care is needed in +[tuning the upload settings](#s3a_fast_upload_thread_tuning). + + +```xml +<property> + <name>fs.s3a.fast.upload</name> + <value>true</value> +</property> + +<property> + <name>fs.s3a.fast.upload.buffer</name> + <value>bytebuffer</value> +</property> +``` + +#### <a name="s3a_fast_upload_array"></a>Fast Upload with Arrays: `fs.s3a.fast.upload.buffer=array` + +When `fs.s3a.fast.upload.buffer` is set to `array`, all data is buffered +in byte arrays in the JVM's heap prior to upload. +This *may* be faster than buffering to disk. + +This `array` option is similar to the in-memory-only stream offered in +Hadoop 2.7 with `fs.s3a.fast.upload=true` + +The amount of data which can be buffered is limited by the available +size of the JVM heap heap. The slower the write bandwidth to S3, the greater +the risk of heap overflows. This risk can be mitigated by +[tuning the upload settings](#s3a_fast_upload_thread_tuning). + +```xml +<property> + <name>fs.s3a.fast.upload</name> + <value>true</value> +</property> + +<property> + <name>fs.s3a.fast.upload.buffer</name> + <value>array</value> +</property> + +``` +#### <a name="s3a_fast_upload_thread_tuning"></a>S3A Fast Upload Thread Tuning + +Both the [Array](#s3a_fast_upload_array) and [Byte buffer](#s3a_fast_upload_bytebuffer) +buffer mechanisms can consume very large amounts of memory, on-heap or +off-heap respectively. The [disk buffer](#s3a_fast_upload_disk) mechanism +does not use much memory up, but will consume hard disk capacity. + +If there are many output streams being written to in a single process, the +amount of memory or disk used is the multiple of all stream's active memory/disk use. + +Careful tuning may be needed to reduce the risk of running out memory, especially +if the data is buffered in memory. + +There are a number parameters which can be tuned: + +1. The total number of threads available in the filesystem for data +uploads *or any other queued filesystem operation*. This is set in +`fs.s3a.threads.max` + +1. The number of operations which can be queued for execution:, *awaiting +a thread*: `fs.s3a.max.total.tasks` + +1. The number of blocks which a single output stream can have active, +that is: being uploaded by a thread, or queued in the filesystem thread queue: +`fs.s3a.fast.upload.active.blocks` + +1. How long an idle thread can stay in the thread pool before it is retired: `fs.s3a.threads.keepalivetime` + + +When the maximum allowed number of active blocks of a single stream is reached, +no more blocks can be uploaded from that stream until one or more of those active +blocks' uploads completes. That is: a `write()` call which would trigger an upload +of a now full datablock, will instead block until there is capacity in the queue. + +How does that come together? + +* As the pool of threads set in `fs.s3a.threads.max` is shared (and intended +to be used across all threads), a larger number here can allow for more +parallel operations. However, as uploads require network bandwidth, adding more +threads does not guarantee speedup. + +* The extra queue of tasks for the thread pool (`fs.s3a.max.total.tasks`) +covers all ongoing background S3A operations (future plans include: parallelized +rename operations, asynchronous directory operations). + +* When using memory buffering, a small value of `fs.s3a.fast.upload.active.blocks` +limits the amount of memory which can be consumed per stream. + +* When using disk buffering a larger value of `fs.s3a.fast.upload.active.blocks` +does not consume much memory. But it may result in a large number of blocks to +compete with other filesystem operations. + + +We recommend a low value of `fs.s3a.fast.upload.active.blocks`; enough +to start background upload without overloading other parts of the system, +then experiment to see if higher values deliver more throughtput âespecially +from VMs running on EC2. + +```xml + +<property> + <name>fs.s3a.fast.upload.active.blocks</name> + <value>4</value> + <description> + Maximum Number of blocks a single output stream can have + active (uploading, or queued to the central FileSystem + instance's pool of queued operations. + + This stops a single stream overloading the shared thread pool. + </description> +</property> + +<property> + <name>fs.s3a.threads.max</name> + <value>10</value> + <description>The total number of threads available in the filesystem for data + uploads *or any other queued filesystem operation*.</description> +</property> + +<property> + <name>fs.s3a.max.total.tasks</name> + <value>5</value> + <description>The number of operations which can be queued for execution</description> +</property> + +<property> + <name>fs.s3a.threads.keepalivetime</name> + <value>60</value> + <description>Number of seconds a thread can be idle before being + terminated.</description> +</property> + +``` + + +#### <a name="s3a_multipart_purge"></a>Cleaning up After Incremental Upload Failures: `fs.s3a.multipart.purge` + + +If an incremental streaming operation is interrupted, there may be +intermediate partitions uploaded to S3 âdata which will be billed for. + +These charges can be reduced by enabling `fs.s3a.multipart.purge`, +and setting a purge time in seconds, such as 86400 seconds â24 hours. +When an S3A FileSystem instance is instantiated with the purge time greater +than zero, it will, on startup, delete all outstanding partition requests +older than this time. + +```xml +<property> + <name>fs.s3a.multipart.purge</name> + <value>true</value> + <description>True if you want to purge existing multipart uploads that may not have been + completed/aborted correctly</description> +</property> + +<property> + <name>fs.s3a.multipart.purge.age</name> + <value>86400</value> + <description>Minimum age in seconds of multipart uploads to purge</description> +</property> +``` + +If an S3A client is instantited with `fs.s3a.multipart.purge=true`, +it will delete all out of date uploads *in the entire bucket*. That is: it will affect all +multipart uploads to that bucket, from all applications. -Writes are buffered in memory instead of to a file on local disk. This -removes the throughput bottleneck of the local disk write and read cycle -before starting the actual upload. Furthermore, it allows handling files that -are larger than the remaining local disk space. - -However, non-trivial memory tuning is needed for optimal results and careless -settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel -(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks` -additional part(uploads) can be waiting (and thus memory buffers are created). -The memory buffer is uploaded as a single upload if it is not larger than -`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiated and -parts of size `fs.s3a.multipart.size` are used to protect against overflowing -the available memory. These settings should be tuned to the envisioned -workflow (some large files, many small ones, ...) and the physical -limitations of the machine and cluster (memory, network bandwidth). +Leaving `fs.s3a.multipart.purge` to its default, `false`, +means that the client will not make any attempt to reset or change the partition +rate. + +The best practise for using this option is to disable multipart purges in +normal use of S3A, enabling only in manual/scheduled housekeeping operations. ### S3A Experimental "fadvise" input policy support @@ -1221,7 +1543,143 @@ can be used: Using the explicit endpoint for the region is recommended for speed and the ability to use the V4 signing API. -## Visible S3 Inconsistency + +### "Timeout waiting for connection from pool" when writing to S3A + +This happens when using the Block output stream, `fs.s3a.fast.upload=true` and +the thread pool runs out of capacity. + +``` +[s3a-transfer-shared-pool1-t20] INFO http.AmazonHttpClient (AmazonHttpClient.java:executeHelper(496)) - Unable to execute HTTP request: Timeout waiting for connection from poolorg.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool + at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:230) + at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199) + at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) + at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) + at java.lang.reflect.Method.invoke(Method.java:498) + at com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) + at com.amazonaws.http.conn.$Proxy10.getConnection(Unknown Source) + at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:424) + at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884) + at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) + at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) + at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:728) + at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) + at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:2921) + at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:2906) + at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1025) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload$1.call(S3ABlockOutputStream.java:360) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload$1.call(S3ABlockOutputStream.java:355) + at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239) + at java.util.concurrent.FutureTask.run(FutureTask.java:266) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +``` + +Make sure that `fs.s3a.connection.maximum` is at least larger +than `fs.s3a.threads.max`. + +```xml +<property> + <name>fs.s3a.threads.max</name> + <value>20</value> +</property> + +<property> + <name>fs.s3a.connection.maximum</name> + <value>30</value> +</property> +``` + +### "Timeout waiting for connection from pool" when reading from S3A + +This happens when more threads are trying to read from an S3A system than +the maximum number of allocated HTTP connections. + +Set `fs.s3a.connection.maximum` to a larger value (and at least as large as +`fs.s3a.threads.max`) + +### Out of heap memory when writing to S3A via Fast Upload + +This can happen when using the fast upload mechanism (`fs.s3a.fast.upload=true`) +and in-memory buffering (either `fs.s3a.fast.upload.buffer=array` or +`fs.s3a.fast.upload.buffer=bytebuffer`). + +More data is being generated than in the JVM than it can upload to S3 âand +so much data has been buffered that the JVM has run out of memory. + +Consult [S3A Fast Upload Thread Tuning](#s3a_fast_upload_thread_tuning) for +detail on this issue and options to address it. Consider also buffering to +disk, rather than memory. + + +### When writing to S3A: "java.io.FileNotFoundException: Completing multi-part upload" + + +``` +java.io.FileNotFoundException: Completing multi-part upload on fork-5/test/multipart/1c397ca6-9dfb-4ac1-9cf7-db666673246b: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 84FF8057174D9369), S3 Extended Request ID: Ij5Yn6Eq/qIERH4Z6Io3YL2t9/qNZ7z9gjPb1FrTtTovZ8k1MXqh+zCYYjqmfJ/fCY6E1+JR9jA= + at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182) + at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770) + at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) + at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:2705) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.complete(S3ABlockOutputStream.java:473) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$200(S3ABlockOutputStream.java:382) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:272) + at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) + at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) +``` + +This surfaces if, while a multipart upload was taking place, all outstanding multipart +uploads were garbage collected. The upload operation cannot complete because +the data uploaded has been deleted. + +Consult [Cleaning up After Incremental Upload Failures](#s3a_multipart_purge) for +details on how the multipart purge timeout can be set. If multipart uploads +are failing with the message above, it may be a sign that this value is too low. + +### When writing to S3A, HTTP Exceptions logged at info from `AmazonHttpClient` + +``` +[s3a-transfer-shared-pool4-t6] INFO http.AmazonHttpClient (AmazonHttpClient.java:executeHelper(496)) - Unable to execute HTTP request: hwdev-steve-ireland-new.s3.amazonaws.com:443 failed to respond +org.apache.http.NoHttpResponseException: bucket.s3.amazonaws.com:443 failed to respond + at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143) + at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) + at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261) + at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) + at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259) + at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:209) + at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) + at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) + at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) + at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686) + at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488) + at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884) + at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) + at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) + at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:728) + at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) + at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) + at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) + at com.amazonaws.services.s3.AmazonS3Client.copyPart(AmazonS3Client.java:1731) + at com.amazonaws.services.s3.transfer.internal.CopyPartCallable.call(CopyPartCallable.java:41) + at com.amazonaws.services.s3.transfer.internal.CopyPartCallable.call(CopyPartCallable.java:28) + at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239) + at java.util.concurrent.FutureTask.run(FutureTask.java:266) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +``` + +These are HTTP I/O exceptions caught and logged inside the AWS SDK. The client +will attempt to retry the operation; it may just be a transient event. If there +are many such exceptions in logs, it may be a symptom of connectivity or network +problems. + +### Visible S3 Inconsistency Amazon S3 is *an eventually consistent object store*. That is: not a filesystem. @@ -1564,7 +2022,7 @@ tests or the `it.test` property for integration tests. mvn clean test -Dtest=TestS3AInputPolicies - mvn clean verify -Dit.test=ITestS3AFileContextStatistics + mvn clean verify -Dit.test=ITestS3AFileContextStatistics -Dtest=none mvn clean verify -Dtest=TestS3A* -Dit.test=ITestS3A* @@ -1614,7 +2072,7 @@ An alternate endpoint may be defined in `test.fs.s3a.sts.endpoint`. The default is ""; meaning "use the amazon default value". -#### CSV Data source Tests +### CSV Data source Tests The `TestS3AInputStreamPerformance` tests require read access to a multi-MB text file. The default file for these tests is one published by amazon, @@ -1661,18 +2119,89 @@ endpoint: <value>s3.amazonaws.com</value> </property> ``` +### Viewing Integration Test Reports -#### Scale test operation count + +Integration test results and logs are stored in `target/failsafe-reports/`. +An HTML report can be generated during site generation, or with the `surefire-report` +plugin: + +``` +mvn surefire-report:failsafe-report-only +``` +### Scale Tests + +There are a set of tests designed to measure the scalability and performance +at scale of the S3A tests, *Scale Tests*. Tests include: creating +and traversing directory trees, uploading large files, renaming them, +deleting them, seeking through the files, performing random IO, and others. +This makes them a foundational part of the benchmarking. + +By their very nature they are slow. And, as their execution time is often +limited by bandwidth between the computer running the tests and the S3 endpoint, +parallel execution does not speed these tests up. + +#### Enabling the Scale Tests + +The tests are enabled if the `scale` property is set in the maven build +this can be done regardless of whether or not the parallel test profile +is used + +```bash +mvn verify -Dscale + +mvn verify -Dparallel-tests -Dscale -DtestsThreadCount=8 +``` + +The most bandwidth intensive tests (those which upload data) always run +sequentially; those which are slow due to HTTPS setup costs or server-side +actionsare included in the set of parallelized tests. + + +#### Maven build tuning options + + +Some of the tests can be tuned from the maven build or from the +configuration file used to run the tests. + +```bash +mvn verify -Dscale -Dfs.s3a.scale.test.huge.filesize=128M +``` + +The algorithm is + +1. The value is queried from the configuration file, using a default value if +it is not set. +1. The value is queried from the JVM System Properties, where it is passed +down by maven. +1. If the system property is null, empty, or it has the value `unset`, then +the configuration value is used. The `unset` option is used to +[work round a quirk in maven property propagation](http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven). + +Only a few properties can be set this way; more will be added. + +| Property | Meaninging | +|-----------|-------------| +| `fs.s3a.scale.test.timeout`| Timeout in seconds for scale tests | +| `fs.s3a.scale.test.huge.filesize`| Size for huge file uploads | +| `fs.s3a.scale.test.huge.huge.partitionsize`| Size for partitions in huge file uploads | + +The file and partition sizes are numeric values with a k/m/g/t/p suffix depending +on the desired size. For example: 128M, 128m, 2G, 2G, 4T or even 1P. + +#### Scale test configuration options Some scale tests perform multiple operations (such as creating many directories). The exact number of operations to perform is configurable in the option `scale.test.operation.count` - <property> - <name>scale.test.operation.count</name> - <value>10</value> - </property> +```xml +<property> + <name>scale.test.operation.count</name> + <value>10</value> +</property> +``` Larger values generate more load, and are recommended when testing locally, or in batch runs. @@ -1685,19 +2214,64 @@ the width and depth of tests creating recursive directories. Larger values create exponentially more directories, with consequent performance impact. - <property> - <name>scale.test.directory.count</name> - <value>2</value> - </property> +```xml +<property> + <name>scale.test.directory.count</name> + <value>2</value> +</property> +``` DistCp tests targeting S3A support a configurable file size. The default is 10 MB, but the configuration value is expressed in KB so that it can be tuned smaller to achieve faster test runs. - <property> - <name>scale.test.distcp.file.size.kb</name> - <value>10240</value> - </property> +```xml +<property> + <name>scale.test.distcp.file.size.kb</name> + <value>10240</value> +</property> +``` + +S3A specific scale test properties are + +##### `fs.s3a.scale.test.huge.filesize`: size in MB for "Huge file tests". + +The Huge File tests validate S3A's ability to handle large files âthe property +`fs.s3a.scale.test.huge.filesize` declares the file size to use. + +```xml +<property> + <name>fs.s3a.scale.test.huge.filesize</name> + <value>200M</value> +</property> +``` + +Amazon S3 handles files larger than 5GB differently than smaller ones. +Setting the huge filesize to a number greater than that) validates support +for huge files. + +```xml +<property> + <name>fs.s3a.scale.test.huge.filesize</name> + <value>6G</value> +</property> +``` + +Tests at this scale are slow: they are best executed from hosts running in +the cloud infrastructure where the S3 endpoint is based. +Otherwise, set a large timeout in `fs.s3a.scale.test.timeout` + +```xml +<property> + <name>fs.s3a.scale.test.timeout</name> + <value>432000</value> +</property> +``` + + +The tests are executed in an order to only clean up created files after +the end of all the tests. If the tests are interrupted, the test data will remain. + ### Testing against non AWS S3 endpoints.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index 28278fe..9e14ed2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -18,24 +18,26 @@ package org.apache.hadoop.fs.contract.s3a; -import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; /** * Contract test suite covering S3A integration with DistCp. + * Uses the block output stream, buffered to disk. This is the + * recommended output mechanism for DistCP due to its scalability. */ public class ITestS3AContractDistCp extends AbstractContractDistCpTest { - private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB + private static final long MULTIPART_SETTING = MULTIPART_MIN_SIZE; @Override protected Configuration createConfiguration() { Configuration newConf = super.createConfiguration(); - newConf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_SETTING); newConf.setLong(MULTIPART_SIZE, MULTIPART_SETTING); + newConf.setBoolean(FAST_UPLOAD, true); + newConf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK); return newConf; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java index b7973b3..e049fd1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java @@ -48,6 +48,7 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase @Override public void teardown() throws Exception { super.teardown(); + describe("closing file system"); IOUtils.closeStream(getFileSystem()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java index b0b8a65..b1b8240 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,18 +20,23 @@ package org.apache.hadoop.fs.s3a; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.util.StopWatch; -import org.junit.*; + +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; /** - * Basic unit test for S3A's blocking executor service. + * Basic test for S3A's blocking executor service. */ public class ITestBlockingThreadPoolExecutorService { @@ -47,7 +52,10 @@ public class ITestBlockingThreadPoolExecutorService { private static final Integer SOME_VALUE = 1337; - private static BlockingThreadPoolExecutorService tpe = null; + private static BlockingThreadPoolExecutorService tpe; + + @Rule + public Timeout testTimeout = new Timeout(60 * 1000); @AfterClass public static void afterClass() throws Exception { @@ -71,13 +79,23 @@ public class ITestBlockingThreadPoolExecutorService { @Test public void testSubmitRunnable() throws Exception { ensureCreated(); - int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS; + verifyQueueSize(tpe, NUM_ACTIVE_TASKS + NUM_WAITING_TASKS); + } + + /** + * Verify the size of the executor's queue, by verifying that the first + * submission to block is {@code expectedQueueSize + 1}. + * @param executorService executor service to test + * @param expectedQueueSize size of queue + */ + protected void verifyQueueSize(ExecutorService executorService, + int expectedQueueSize) { StopWatch stopWatch = new StopWatch().start(); - for (int i = 0; i < totalTasks; i++) { - tpe.submit(sleeper); + for (int i = 0; i < expectedQueueSize; i++) { + executorService.submit(sleeper); assertDidntBlock(stopWatch); } - tpe.submit(sleeper); + executorService.submit(sleeper); assertDidBlock(stopWatch); } @@ -93,6 +111,15 @@ public class ITestBlockingThreadPoolExecutorService { ensureDestroyed(); } + @Test + public void testChainedQueue() throws Throwable { + ensureCreated(); + int size = 2; + ExecutorService wrapper = new SemaphoredDelegatingExecutor(tpe, + size, true); + verifyQueueSize(wrapper, size); + } + // Helper functions, etc. private void assertDidntBlock(StopWatch sw) { @@ -141,8 +168,9 @@ public class ITestBlockingThreadPoolExecutorService { private static void ensureCreated() throws Exception { if (tpe == null) { LOG.debug("Creating thread pool"); - tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS, - NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest"); + tpe = BlockingThreadPoolExecutorService.newInstance( + NUM_ACTIVE_TASKS, NUM_WAITING_TASKS, + 1, TimeUnit.SECONDS, "btpetest"); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java new file mode 100644 index 0000000..74cad00 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * Tests small file upload functionality for + * {@link S3ABlockOutputStream} with the blocks buffered in byte arrays. + * + * File sizes are kept small to reduce test duration on slow connections; + * multipart tests are kept in scale tests. + */ +public class ITestS3ABlockOutputArray extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + conf.setBoolean(Constants.FAST_UPLOAD, true); + conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); + return conf; + } + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BUFFER_ARRAY; + } + + @Test + public void testZeroByteUpload() throws IOException { + verifyUpload("0", 0); + } + + @Test + public void testRegularUpload() throws IOException { + verifyUpload("regular", 1024); + } + + @Test(expected = IOException.class) + public void testDoubleStreamClose() throws Throwable { + Path dest = path("testDoubleStreamClose"); + describe(" testDoubleStreamClose"); + FSDataOutputStream stream = getFileSystem().create(dest, true); + byte[] data = ContractTestUtils.dataset(16, 'a', 26); + try { + stream.write(data); + stream.close(); + stream.write(data); + } finally { + IOUtils.closeStream(stream); + } + } + + public void verifyUpload(String name, int fileSize) throws IOException { + Path dest = path(name); + describe(name + " upload to " + dest); + ContractTestUtils.createAndVerifyFile( + getFileSystem(), + dest, + fileSize); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java new file mode 100644 index 0000000..504426b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering. + */ +public class ITestS3ABlockOutputByteBuffer extends ITestS3ABlockOutputArray { + + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BYTEBUFFER; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java new file mode 100644 index 0000000..550706d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering. + */ +public class ITestS3ABlockOutputDisk extends ITestS3ABlockOutputArray { + + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BUFFER_DISK; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java index 4444d0c..991135e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java @@ -72,6 +72,8 @@ public class ITestS3ABlockingThreadPool { @Test public void testFastMultiPartUpload() throws Exception { conf.setBoolean(Constants.FAST_UPLOAD, true); + conf.set(Constants.FAST_UPLOAD_BUFFER, + Constants.FAST_UPLOAD_BYTEBUFFER); fs = S3ATestUtils.createTestFileSystem(conf); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * 1024); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index b08bfe9..30d4bf6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3native.S3xLoginHelper; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; @@ -417,6 +419,33 @@ public class ITestS3AConfiguration { fs.close(); } + @Test + public void testDirectoryAllocatorDefval() throws Throwable { + conf = new Configuration(); + conf.unset(Constants.BUFFER_DIR); + fs = S3ATestUtils.createTestFileSystem(conf); + File tmp = fs.createTmpFileForWrite("out-", 1024, conf); + assertTrue("not found: " + tmp, tmp.exists()); + tmp.delete(); + } + + @Test + public void testDirectoryAllocatorRR() throws Throwable { + File dir1 = GenericTestUtils.getRandomizedTestDir(); + File dir2 = GenericTestUtils.getRandomizedTestDir(); + dir1.mkdirs(); + dir2.mkdirs(); + conf = new Configuration(); + conf.set(Constants.BUFFER_DIR, dir1 +", " + dir2); + fs = S3ATestUtils.createTestFileSystem(conf); + File tmp1 = fs.createTmpFileForWrite("out-", 1024, conf); + tmp1.delete(); + File tmp2 = fs.createTmpFileForWrite("out-", 1024, conf); + tmp2.delete(); + assertNotEquals("round robin not working", + tmp1.getParent(), tmp2.getParent()); + } + /** * Reads and returns a field from an object using reflection. If the field * cannot be found, is null, or is not the expected type, then this method http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java new file mode 100644 index 0000000..5239f30 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; + +/** + * Run the encryption tests against the block output stream. + */ +public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(Constants.FAST_UPLOAD, true); + conf.set(Constants.FAST_UPLOAD_BUFFER, + Constants.FAST_UPLOAD_BYTEBUFFER); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java deleted file mode 100644 index c06fed1..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import org.apache.hadoop.conf.Configuration; - -/** - * Run the encryption tests against the Fast output stream. - * This verifies that both file writing paths can encrypt their data. - */ -public class ITestS3AEncryptionFastOutputStream extends ITestS3AEncryption { - - @Override - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - conf.setBoolean(Constants.FAST_UPLOAD, true); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java deleted file mode 100644 index b5fa1c3..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; - -/** - * Tests regular and multi-part upload functionality for S3AFastOutputStream. - * File sizes are kept small to reduce test duration on slow connections - */ -public class ITestS3AFastOutputStream { - private FileSystem fs; - - - @Rule - public Timeout testTimeout = new Timeout(30 * 60 * 1000); - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024); - conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024); - conf.setBoolean(Constants.FAST_UPLOAD, true); - fs = S3ATestUtils.createTestFileSystem(conf); - } - - @After - public void tearDown() throws Exception { - if (fs != null) { - fs.delete(getTestPath(), true); - } - } - - protected Path getTestPath() { - return new Path("/tests3a"); - } - - @Test - public void testRegularUpload() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); - } - - @Test - public void testMultiPartUpload() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * - 1024); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java new file mode 100644 index 0000000..88204b2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + +/** + * Test the test utils. Why an integration test? it's needed to + * verify property pushdown. + */ +public class ITestS3ATestUtils extends Assert { + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ATestUtils.class); + public static final String KEY = "undefined.property"; + + @Before + public void clear() { + System.clearProperty(KEY); + } + + @Test + public void testGetTestProperty() throws Throwable { + Configuration conf = new Configuration(false); + assertEquals("a", getTestProperty(conf, KEY, "a")); + conf.set(KEY, "\t b \n"); + assertEquals("b", getTestProperty(conf, KEY, "a")); + System.setProperty(KEY, "c"); + assertEquals("c", getTestProperty(conf, KEY, "a")); + unsetSysprop(); + assertEquals("b", getTestProperty(conf, KEY, "a")); + } + + @Test + public void testGetTestPropertyLong() throws Throwable { + Configuration conf = new Configuration(false); + assertEquals(1, getTestPropertyLong(conf, KEY, 1)); + conf.setInt(KEY, 2); + assertEquals(2, getTestPropertyLong(conf, KEY, 1)); + System.setProperty(KEY, "3"); + assertEquals(3, getTestPropertyLong(conf, KEY, 1)); + } + + @Test + public void testGetTestPropertyInt() throws Throwable { + Configuration conf = new Configuration(false); + assertEquals(1, getTestPropertyInt(conf, KEY, 1)); + conf.setInt(KEY, 2); + assertEquals(2, getTestPropertyInt(conf, KEY, 1)); + System.setProperty(KEY, "3"); + assertEquals(3, getTestPropertyInt(conf, KEY, 1)); + conf.unset(KEY); + assertEquals(3, getTestPropertyInt(conf, KEY, 1)); + unsetSysprop(); + assertEquals(5, getTestPropertyInt(conf, KEY, 5)); + } + + @Test + public void testGetTestPropertyBool() throws Throwable { + Configuration conf = new Configuration(false); + assertTrue(getTestPropertyBool(conf, KEY, true)); + conf.set(KEY, "\tfalse \n"); + assertFalse(getTestPropertyBool(conf, KEY, true)); + System.setProperty(KEY, "true"); + assertTrue(getTestPropertyBool(conf, KEY, true)); + unsetSysprop(); + assertEquals("false", getTestProperty(conf, KEY, "true")); + conf.unset(KEY); + assertTrue(getTestPropertyBool(conf, KEY, true)); + } + + protected void unsetSysprop() { + System.setProperty(KEY, UNSET_PROPERTY); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 6a4e68c..6894bb0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -44,14 +44,35 @@ public interface S3ATestConstants { String TEST_FS_S3A_NAME = TEST_FS_S3A + "name"; /** + * Run the encryption tests? + */ + String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled"; + + /** + * Tell tests that they are being executed in parallel: {@value}. + */ + String KEY_PARALLEL_TEST_EXECUTION = "test.parallel.execution"; + + /** + * A property set to true in maven if scale tests are enabled: {@value}. + */ + String KEY_SCALE_TESTS_ENABLED = S3A_SCALE_TEST + "enabled"; + + /** * The number of operations to perform: {@value}. */ String KEY_OPERATION_COUNT = SCALE_TEST + "operation.count"; /** + * The number of directory operations to perform: {@value}. + */ + String KEY_DIRECTORY_COUNT = SCALE_TEST + "directory.count"; + + /** * The readahead buffer: {@value}. */ String KEY_READ_BUFFER_SIZE = S3A_SCALE_TEST + "read.buffer.size"; + int DEFAULT_READ_BUFFER_SIZE = 16384; /** @@ -65,12 +86,62 @@ public interface S3ATestConstants { String DEFAULT_CSVTEST_FILE = "s3a://landsat-pds/scene_list.gz"; /** + * Endpoint for the S3 CSV/scale tests. This defaults to + * being us-east. + */ + String KEY_CSVTEST_ENDPOINT = S3A_SCALE_TEST + "csvfile.endpoint"; + + /** + * Endpoint for the S3 CSV/scale tests. This defaults to + * being us-east. + */ + String DEFAULT_CSVTEST_ENDPOINT = "s3.amazonaws.com"; + + /** + * Name of the property to define the timeout for scale tests: {@value}. + * Measured in seconds. + */ + String KEY_TEST_TIMEOUT = S3A_SCALE_TEST + "timeout"; + + /** + * Name of the property to define the file size for the huge file + * tests: {@value}. + * Measured in KB; a suffix like "M", or "G" will change the unit. + */ + String KEY_HUGE_FILESIZE = S3A_SCALE_TEST + "huge.filesize"; + + /** + * Name of the property to define the partition size for the huge file + * tests: {@value}. + * Measured in KB; a suffix like "M", or "G" will change the unit. + */ + String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize"; + + /** + * The default huge size is small âfull 5GB+ scale tests are something + * to run in long test runs on EC2 VMs. {@value}. + */ + String DEFAULT_HUGE_FILESIZE = "10M"; + + /** * The default number of operations to perform: {@value}. */ long DEFAULT_OPERATION_COUNT = 2005; /** - * Run the encryption tests? + * Default number of directories to create when performing + * directory performance/scale tests. */ - String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled"; + int DEFAULT_DIRECTORY_COUNT = 2; + + /** + * Default scale test timeout in seconds: {@value}. + */ + int DEFAULT_TEST_TIMEOUT = 30 * 60; + + /** + * Default policy on scale tests: {@value}. + */ + boolean DEFAULT_SCALE_TESTS_ENABLED = false; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 95f6d4b..c67e118 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -40,6 +40,12 @@ import static org.apache.hadoop.fs.s3a.Constants.*; public class S3ATestUtils { /** + * Value to set a system property to (in maven) to declare that + * a property has been unset. + */ + public static final String UNSET_PROPERTY = "unset"; + + /** * Create the test filesystem. * * If the test.fs.s3a.name property is not set, this will @@ -53,8 +59,25 @@ public class S3ATestUtils { */ public static S3AFileSystem createTestFileSystem(Configuration conf) throws IOException { - String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, ""); + return createTestFileSystem(conf, true); + } + /** + * Create the test filesystem with or without multipart purging + * + * If the test.fs.s3a.name property is not set, this will + * trigger a JUnit failure. + * @param conf configuration + * @param purge flag to enable Multipart purging + * @return the FS + * @throws IOException IO Problems + * @throws AssumptionViolatedException if the FS is not named + */ + public static S3AFileSystem createTestFileSystem(Configuration conf, + boolean purge) + throws IOException { + + String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, ""); boolean liveTest = !StringUtils.isEmpty(fsname); URI testURI = null; @@ -70,8 +93,12 @@ public class S3ATestUtils { } S3AFileSystem fs1 = new S3AFileSystem(); //enable purging in tests - conf.setBoolean(PURGE_EXISTING_MULTIPART, true); - conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 0); + if (purge) { + conf.setBoolean(PURGE_EXISTING_MULTIPART, true); + // but a long delay so that parallel multipart tests don't + // suddenly start timing out + conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 30 * 60); + } fs1.initialize(testURI, conf); return fs1; } @@ -149,6 +176,121 @@ public class S3ATestUtils { } /** + * Get a long test property. + * <ol> + * <li>Look up configuration value (which can pick up core-default.xml), + * using {@code defVal} as the default value (if conf != null). + * </li> + * <li>Fetch the system property.</li> + * <li>If the system property is not empty or "(unset)": + * it overrides the conf value. + * </li> + * </ol> + * This puts the build properties in charge of everything. It's not a + * perfect design; having maven set properties based on a file, as ant let + * you do, is better for customization. + * + * As to why there's a special (unset) value, see + * {@link http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven} + * @param conf config: may be null + * @param key key to look up + * @param defVal default value + * @return the evaluated test property. + */ + public static long getTestPropertyLong(Configuration conf, + String key, long defVal) { + return Long.valueOf( + getTestProperty(conf, key, Long.toString(defVal))); + } + /** + * Get a test property value in bytes, using k, m, g, t, p, e suffixes. + * {@link org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix#string2long(String)} + * <ol> + * <li>Look up configuration value (which can pick up core-default.xml), + * using {@code defVal} as the default value (if conf != null). + * </li> + * <li>Fetch the system property.</li> + * <li>If the system property is not empty or "(unset)": + * it overrides the conf value. + * </li> + * </ol> + * This puts the build properties in charge of everything. It's not a + * perfect design; having maven set properties based on a file, as ant let + * you do, is better for customization. + * + * As to why there's a special (unset) value, see + * {@link http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven} + * @param conf config: may be null + * @param key key to look up + * @param defVal default value + * @return the evaluated test property. + */ + public static long getTestPropertyBytes(Configuration conf, + String key, String defVal) { + return org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix + .string2long(getTestProperty(conf, key, defVal)); + } + + /** + * Get an integer test property; algorithm described in + * {@link #getTestPropertyLong(Configuration, String, long)}. + * @param key key to look up + * @param defVal default value + * @return the evaluated test property. + */ + public static int getTestPropertyInt(Configuration conf, + String key, int defVal) { + return (int) getTestPropertyLong(conf, key, defVal); + } + + /** + * Get a boolean test property; algorithm described in + * {@link #getTestPropertyLong(Configuration, String, long)}. + * @param key key to look up + * @param defVal default value + * @return the evaluated test property. + */ + public static boolean getTestPropertyBool(Configuration conf, + String key, + boolean defVal) { + return Boolean.valueOf( + getTestProperty(conf, key, Boolean.toString(defVal))); + } + + /** + * Get a string test property. + * <ol> + * <li>Look up configuration value (which can pick up core-default.xml), + * using {@code defVal} as the default value (if conf != null). + * </li> + * <li>Fetch the system property.</li> + * <li>If the system property is not empty or "(unset)": + * it overrides the conf value. + * </li> + * </ol> + * This puts the build properties in charge of everything. It's not a + * perfect design; having maven set properties based on a file, as ant let + * you do, is better for customization. + * + * As to why there's a special (unset) value, see + * @see <a href="http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven"> + * Stack Overflow</a> + * @param conf config: may be null + * @param key key to look up + * @param defVal default value + * @return the evaluated test property. + */ + + public static String getTestProperty(Configuration conf, + String key, + String defVal) { + String confVal = conf != null ? conf.getTrimmed(key, defVal) : defVal; + String propval = System.getProperty(key); + return StringUtils.isNotEmpty(propval) && !UNSET_PROPERTY.equals(propval) + ? propval : confVal; + } + + /** * The exception to raise so as to exit fast from * {@link #eventually(int, Callable)}. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java new file mode 100644 index 0000000..9fa95fd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** + * Unit tests for {@link S3ADataBlocks}. + */ +public class TestDataBlocks extends Assert { + + @Rule + public Timeout testTimeout = new Timeout(30 * 1000); + + @Before + public void nameThread() { + Thread.currentThread().setName("JUnit"); + } + + /** + * Test the {@link S3ADataBlocks.ByteBufferBlockFactory}. + * That code implements an input stream over a ByteBuffer, and has to + * return the buffer to the pool after the read complete. + * + * This test verifies the basic contract of the process. + */ + @Test + public void testByteBufferIO() throws Throwable { + try (S3ADataBlocks.ByteBufferBlockFactory factory = + new S3ADataBlocks.ByteBufferBlockFactory(null)) { + int limit = 128; + S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block + = factory.create(limit); + assertEquals("outstanding buffers in " + factory, + 1, factory.getOutstandingBufferCount()); + + byte[] buffer = ContractTestUtils.toAsciiByteArray("test data"); + int bufferLen = buffer.length; + block.write(buffer, 0, bufferLen); + assertEquals(bufferLen, block.dataSize()); + assertEquals("capacity in " + block, + limit - bufferLen, block.remainingCapacity()); + assertTrue("hasCapacity(64) in " + block, block.hasCapacity(64)); + assertTrue("No capacity in " + block, + block.hasCapacity(limit - bufferLen)); + + // now start the write + S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream + stream = block.startUpload(); + assertTrue("!hasRemaining() in " + stream, stream.hasRemaining()); + int expected = bufferLen; + assertEquals("wrong available() in " + stream, + expected, stream.available()); + + assertEquals('t', stream.read()); + expected--; + assertEquals("wrong available() in " + stream, + expected, stream.available()); + + // close the block. The buffer must remain outstanding here; + // the stream manages the lifecycle of it now + block.close(); + assertEquals("outstanding buffers in " + factory, + 1, factory.getOutstandingBufferCount()); + block.close(); + + // read into a byte array with an offset + int offset = 5; + byte[] in = new byte[limit]; + assertEquals(2, stream.read(in, offset, 2)); + assertEquals('e', in[offset]); + assertEquals('s', in[offset + 1]); + expected -= 2; + assertEquals("wrong available() in " + stream, + expected, stream.available()); + + // read to end + byte[] remainder = new byte[limit]; + int c; + int index = 0; + while ((c = stream.read()) >= 0) { + remainder[index++] = (byte) c; + } + assertEquals(expected, index); + assertEquals('a', remainder[--index]); + + assertEquals("wrong available() in " + stream, + 0, stream.available()); + assertTrue("hasRemaining() in " + stream, !stream.hasRemaining()); + + // when the stream is closed, the data should be returned + stream.close(); + assertEquals("outstanding buffers in " + factory, + 0, factory.getOutstandingBufferCount()); + stream.close(); + assertEquals("outstanding buffers in " + factory, + 0, factory.getOutstandingBufferCount()); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 5e88aba..e1aef75 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -34,6 +34,7 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { fc = S3ATestUtils.createTestFileContext(conf); fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true); + FileContext.clearStatistics(); } @After --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org