[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4445: HADOOP-18106: Handle memory fragmentation in S3A Vectored IO.

2022-06-20 Thread GitBox


mukund-thakur commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901891617


##
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##
@@ -940,90 +949,135 @@ public void readVectored(List 
ranges,
 
 LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
 checkNotClosed();
+if (stopVectoredIOOperations.getAndSet(false)) {
+  LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+}
+List sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
 for (FileRange range : ranges) {
   validateRangeRequest(range);
   CompletableFuture result = new CompletableFuture<>();
   range.setData(result);
 }
 
-if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
   LOG.debug("Not merging the ranges as they are disjoint");
-  for(FileRange range: ranges) {
+  for (FileRange range: sortedRanges) {
 ByteBuffer buffer = allocate.apply(range.getLength());
 unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
   }
 } else {
   LOG.debug("Trying to merge the ranges as they are not disjoint");
-  List combinedFileRanges = sortAndMergeRanges(ranges,
+  List combinedFileRanges = 
mergeSortedRanges(sortedRanges,
   1, minSeekForVectorReads(),
   maxReadSizeForVectorReads());
   LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
   ranges.size(), combinedFileRanges.size());
-  for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-CompletableFuture result = new CompletableFuture<>();
-ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-combinedFileRange.setData(result);
+  for (CombinedFileRange combinedFileRange: combinedFileRanges) {
 unboundedThreadPool.submit(
-() -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+() -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
   }
 }
 LOG.debug("Finished submitting vectored read to threadpool" +
 " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
*/
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-  ByteBuffer buffer) {
-// Not putting read single range call inside try block as
-// exception if any occurred during this call will be raised
-// during awaitFuture call while getting the combined buffer.
-readSingleRange(combinedFileRange, buffer);
+  IntFunction 
allocate) {
+LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+// This reference is must be kept till all buffers are populated as this 
is a
+// finalizable object which closes the internal stream when gc triggers.
+S3Object objectRange = null;
+S3ObjectInputStream objectContent = null;
 try {
-  // In case of single range we return the original byte buffer else
-  // we return slice byte buffers for each child ranges.
-  ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-  if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-  } else {
-for (FileRange child : combinedFileRange.getUnderlying()) {
-  updateOriginalRange(child, combinedBuffer, combinedFileRange);
-}
+  checkIfVectoredIOStopped();
+  final String operationName = "readCombinedFileRange";
+  objectRange = getS3Object(operationName,
+  combinedFileRange.getOffset(),
+  combinedFileRange.getLength());
+  objectContent = objectRange.getObjectContent();
+  if (objectContent == null) {
+throw new PathIOException(uri,
+"Null IO stream received during " + operationName);
   }
+  populateChildBuffers(combinedFileRange, objectContent, allocate);
 } catch (Exception ex) {
-  LOG.warn("Exception occurred while reading combined range from file {}", 
pathStr, ex);
+  LOG.warn("Exception while reading a range {} from path {} ", 
combinedFileRange, pathStr, ex);

Review Comment:
   Okay changing to debug. 
   Or do you think it is better to 

[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4445: HADOOP-18106: Handle memory fragmentation in S3A Vectored IO.

2022-06-20 Thread GitBox


mukund-thakur commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901884952


##
hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java:
##
@@ -47,7 +47,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.impl.FileRangeImpl;

Review Comment:
   it is required actually. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org