mukund-thakur commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901891617
##
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##
@@ -940,90 +949,135 @@ public void readVectored(List
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+}
+List sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture result = new CompletableFuture<>();
range.setData(result);
}
-if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
- List combinedFileRanges = sortAndMergeRanges(ranges,
+ List combinedFileRanges =
mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges
{} ",
ranges.size(), combinedFileRanges.size());
- for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-CompletableFuture result = new CompletableFuture<>();
-ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-combinedFileRange.setData(result);
+ for (CombinedFileRange combinedFileRange: combinedFileRanges) {
unboundedThreadPool.submit(
-() -> readCombinedRangeAndUpdateChildren(combinedFileRange,
buffer));
+() -> readCombinedRangeAndUpdateChildren(combinedFileRange,
allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
- * Read data in the combinedFileRange and update data in buffers
- * of all underlying ranges.
- * @param combinedFileRange combined range.
- * @param buffer combined buffer.
+ * Read the data from S3 for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
- ByteBuffer buffer) {
-// Not putting read single range call inside try block as
-// exception if any occurred during this call will be raised
-// during awaitFuture call while getting the combined buffer.
-readSingleRange(combinedFileRange, buffer);
+ IntFunction
allocate) {
+LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, pathStr);
+// This reference is must be kept till all buffers are populated as this
is a
+// finalizable object which closes the internal stream when gc triggers.
+S3Object objectRange = null;
+S3ObjectInputStream objectContent = null;
try {
- // In case of single range we return the original byte buffer else
- // we return slice byte buffers for each child ranges.
- ByteBuffer combinedBuffer =
FutureIOSupport.awaitFuture(combinedFileRange.getData());
- if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
- } else {
-for (FileRange child : combinedFileRange.getUnderlying()) {
- updateOriginalRange(child, combinedBuffer, combinedFileRange);
-}
+ checkIfVectoredIOStopped();
+ final String operationName = "readCombinedFileRange";
+ objectRange = getS3Object(operationName,
+ combinedFileRange.getOffset(),
+ combinedFileRange.getLength());
+ objectContent = objectRange.getObjectContent();
+ if (objectContent == null) {
+throw new PathIOException(uri,
+"Null IO stream received during " + operationName);
}
+ populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) {
- LOG.warn("Exception occurred while reading combined range from file {}",
pathStr, ex);
+ LOG.warn("Exception while reading a range {} from path {} ",
combinedFileRange, pathStr, ex);
Review Comment:
Okay changing to debug.
Or do you think it is better to