[jira] [Updated] (HDFS-15206) HDFS synchronous reads from local file system

2020-03-04 Thread Mania Abdi (Jira)


 [ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
--
Labels: performance  (was: )

> HDFS synchronous reads from local file system
> -
>
> Key: HDFS-15206
> URL: https://issues.apache.org/jira/browse/HDFS-15206
> Project: Hadoop HDFS
>  Issue Type: Improvement
>  Components: datanode
> Environment: !Screenshot from 2020-03-03 22-07-26.png!
>Reporter: Mania Abdi
>Priority: Minor
>  Labels: performance
> Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
> file1.txt has 1MB size and I capture the workflow of requests using XTrace. 
> By evaluating the workflow trace, I noticed that datanode issues 64KB 
> synchronous read request to local file system to read the data, and sends the 
> data back and waits for completion. I had a code walk over HDFS code to 
> verify the pattern and it was correct. I want to have two suggestions, (1) 
> since each file in HDFS block size is usually 128MB, We could use the mmap 
> mapping via FileChannel class to load the file into memory and enable file 
> system prefetching and look ahead in background, instead of synchronously 
> reading from disk. The second suggestion is to use asynchronous read 
> operations to local disk of the datanode. I was wondering if there is a logic 
> behind synchronous reads from the file system?
>  
> Code: 
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
>  line 586
> {code:java}
>   /**
>* Sends a packet with up to maxChunks chunks of data.
>* 
>* @param pkt buffer used for writing packet data
>* @param maxChunks maximum number of chunks to send
>* @param out stream to send data to
>* @param transferTo use transferTo to send data
>* @param throttler used for throttling data transfer bandwidth
>*/
>   private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
>   boolean transferTo, DataTransferThrottler throttler) throws IOException 
> {
> int dataLen = (int) Math.min(endOffset - offset,
>  (chunkSize * (long) maxChunks));
> 
> int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in 
> the packet
> int checksumDataLen = numChunks * checksumSize;
> int packetLen = dataLen + checksumDataLen + 4;
> boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
> // The packet buffer is organized as follows:
> // ___D?D?D?D?
> //^   ^
> //|   \ checksumOff
> //\ headerOff
> // _ padding, since the header is variable-length
> // H = header and length prefixes
> // C = checksums
> // D? = data, if transferTo is false.
> 
> int headerLen = writePacketHeader(pkt, dataLen, packetLen);
> 
> // Per above, the header doesn't start at the beginning of the
> // buffer
> int headerOff = pkt.position() - headerLen;
> 
> int checksumOff = pkt.position();
> byte[] buf = pkt.array();
> 
> if (checksumSize > 0 && checksumIn != null) {
>   readChecksum(buf, checksumOff, checksumDataLen);  // write in 
> progress that we need to use to get last checksum
>   if (lastDataPacket && lastChunkChecksum != null) {
> int start = checksumOff + checksumDataLen - checksumSize;
> byte[] updatedChecksum = lastChunkChecksum.getChecksum();
> 
> if (updatedChecksum != null) {
>   System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
> }
>   }
> }
> 
> int dataOff = checksumOff + checksumDataLen;
> if (!transferTo) { // normal transfer
>   IOUtils.readFully(blockIn, buf, dataOff, dataLen);  
>   if (verifyChecksum) {
> verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
>   }
> }
> 
> try {
>   if (transferTo) {
> SocketOutputStream sockOut = (SocketOutputStream)out;
> // First write header and checksums
> sockOut.write(buf, headerOff, dataOff - headerOff);
> 
> // no need to flush since we know out is not a buffered stream
> FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
> LongWritable waitTime = new LongWritable();
> LongWritable transferTime = new LongWritable();
> sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
> waitTime, transferTime);
> 
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
> datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
> blockInPosition += 

[jira] [Updated] (HDFS-15206) HDFS synchronous reads from local file system

2020-03-04 Thread Mania Abdi (Jira)


 [ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
--
Component/s: datanode

> HDFS synchronous reads from local file system
> -
>
> Key: HDFS-15206
> URL: https://issues.apache.org/jira/browse/HDFS-15206
> Project: Hadoop HDFS
>  Issue Type: Improvement
>  Components: datanode
> Environment: !Screenshot from 2020-03-03 22-07-26.png!
>Reporter: Mania Abdi
>Priority: Minor
> Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
> file1.txt has 1MB size and I capture the workflow of requests using XTrace. 
> By evaluating the workflow trace, I noticed that datanode issues 64KB 
> synchronous read request to local file system to read the data, and sends the 
> data back and waits for completion. I had a code walk over HDFS code to 
> verify the pattern and it was correct. I want to have two suggestions, (1) 
> since each file in HDFS block size is usually 128MB, We could use the mmap 
> mapping via FileChannel class to load the file into memory and enable file 
> system prefetching and look ahead in background, instead of synchronously 
> reading from disk. The second suggestion is to use asynchronous read 
> operations to local disk of the datanode. I was wondering if there is a logic 
> behind synchronous reads from the file system?
>  
> Code: 
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
>  line 586
> {code:java}
>   /**
>* Sends a packet with up to maxChunks chunks of data.
>* 
>* @param pkt buffer used for writing packet data
>* @param maxChunks maximum number of chunks to send
>* @param out stream to send data to
>* @param transferTo use transferTo to send data
>* @param throttler used for throttling data transfer bandwidth
>*/
>   private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
>   boolean transferTo, DataTransferThrottler throttler) throws IOException 
> {
> int dataLen = (int) Math.min(endOffset - offset,
>  (chunkSize * (long) maxChunks));
> 
> int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in 
> the packet
> int checksumDataLen = numChunks * checksumSize;
> int packetLen = dataLen + checksumDataLen + 4;
> boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
> // The packet buffer is organized as follows:
> // ___D?D?D?D?
> //^   ^
> //|   \ checksumOff
> //\ headerOff
> // _ padding, since the header is variable-length
> // H = header and length prefixes
> // C = checksums
> // D? = data, if transferTo is false.
> 
> int headerLen = writePacketHeader(pkt, dataLen, packetLen);
> 
> // Per above, the header doesn't start at the beginning of the
> // buffer
> int headerOff = pkt.position() - headerLen;
> 
> int checksumOff = pkt.position();
> byte[] buf = pkt.array();
> 
> if (checksumSize > 0 && checksumIn != null) {
>   readChecksum(buf, checksumOff, checksumDataLen);  // write in 
> progress that we need to use to get last checksum
>   if (lastDataPacket && lastChunkChecksum != null) {
> int start = checksumOff + checksumDataLen - checksumSize;
> byte[] updatedChecksum = lastChunkChecksum.getChecksum();
> 
> if (updatedChecksum != null) {
>   System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
> }
>   }
> }
> 
> int dataOff = checksumOff + checksumDataLen;
> if (!transferTo) { // normal transfer
>   IOUtils.readFully(blockIn, buf, dataOff, dataLen);  
>   if (verifyChecksum) {
> verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
>   }
> }
> 
> try {
>   if (transferTo) {
> SocketOutputStream sockOut = (SocketOutputStream)out;
> // First write header and checksums
> sockOut.write(buf, headerOff, dataOff - headerOff);
> 
> // no need to flush since we know out is not a buffered stream
> FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
> LongWritable waitTime = new LongWritable();
> LongWritable transferTime = new LongWritable();
> sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
> waitTime, transferTime);
> 
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
> datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
> blockInPosition += dataLen;
>   } else {
> // normal 

[jira] [Updated] (HDFS-15206) HDFS synchronous reads from local file system

2020-03-04 Thread Mania Abdi (Jira)


 [ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
--
Priority: Minor  (was: Major)

> HDFS synchronous reads from local file system
> -
>
> Key: HDFS-15206
> URL: https://issues.apache.org/jira/browse/HDFS-15206
> Project: Hadoop HDFS
>  Issue Type: Improvement
> Environment: !Screenshot from 2020-03-03 22-07-26.png!
>Reporter: Mania Abdi
>Priority: Minor
> Attachments: Screenshot from 2020-03-03 22-07-26.png
>
>
> Hello everyone,
> I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
> file1.txt has 1MB size and I capture the workflow of requests using XTrace. 
> By evaluating the workflow trace, I noticed that datanode issues 64KB 
> synchronous read request to local file system to read the data, and sends the 
> data back and waits for completion. I had a code walk over HDFS code to 
> verify the pattern and it was correct. I want to have two suggestions, (1) 
> since each file in HDFS block size is usually 128MB, We could use the mmap 
> mapping via FileChannel class to load the file into memory and enable file 
> system prefetching and look ahead in background, instead of synchronously 
> reading from disk. The second suggestion is to use asynchronous read 
> operations to local disk of the datanode. I was wondering if there is a logic 
> behind synchronous reads from the file system?
>  
> Code: 
> $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
>  line 586
> {code:java}
>   /**
>* Sends a packet with up to maxChunks chunks of data.
>* 
>* @param pkt buffer used for writing packet data
>* @param maxChunks maximum number of chunks to send
>* @param out stream to send data to
>* @param transferTo use transferTo to send data
>* @param throttler used for throttling data transfer bandwidth
>*/
>   private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
>   boolean transferTo, DataTransferThrottler throttler) throws IOException 
> {
> int dataLen = (int) Math.min(endOffset - offset,
>  (chunkSize * (long) maxChunks));
> 
> int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in 
> the packet
> int checksumDataLen = numChunks * checksumSize;
> int packetLen = dataLen + checksumDataLen + 4;
> boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
> // The packet buffer is organized as follows:
> // ___D?D?D?D?
> //^   ^
> //|   \ checksumOff
> //\ headerOff
> // _ padding, since the header is variable-length
> // H = header and length prefixes
> // C = checksums
> // D? = data, if transferTo is false.
> 
> int headerLen = writePacketHeader(pkt, dataLen, packetLen);
> 
> // Per above, the header doesn't start at the beginning of the
> // buffer
> int headerOff = pkt.position() - headerLen;
> 
> int checksumOff = pkt.position();
> byte[] buf = pkt.array();
> 
> if (checksumSize > 0 && checksumIn != null) {
>   readChecksum(buf, checksumOff, checksumDataLen);  // write in 
> progress that we need to use to get last checksum
>   if (lastDataPacket && lastChunkChecksum != null) {
> int start = checksumOff + checksumDataLen - checksumSize;
> byte[] updatedChecksum = lastChunkChecksum.getChecksum();
> 
> if (updatedChecksum != null) {
>   System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
> }
>   }
> }
> 
> int dataOff = checksumOff + checksumDataLen;
> if (!transferTo) { // normal transfer
>   IOUtils.readFully(blockIn, buf, dataOff, dataLen);  
>   if (verifyChecksum) {
> verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
>   }
> }
> 
> try {
>   if (transferTo) {
> SocketOutputStream sockOut = (SocketOutputStream)out;
> // First write header and checksums
> sockOut.write(buf, headerOff, dataOff - headerOff);
> 
> // no need to flush since we know out is not a buffered stream
> FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
> LongWritable waitTime = new LongWritable();
> LongWritable transferTime = new LongWritable();
> sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
> waitTime, transferTime);
> 
> datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
> datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
> blockInPosition += dataLen;
>   } else {
> // normal transfer
> 

[jira] [Updated] (HDFS-15206) HDFS synchronous reads from local file system

2020-03-03 Thread Mania Abdi (Jira)


 [ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
--
Description: 
Hello everyone,

I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
file1.txt has 1MB size and I capture the workflow of requests using XTrace. By 
evaluating the workflow trace, I noticed that datanode issues 64KB synchronous 
read request to local file system to read the data, and sends the data back and 
waits for completion. I had a code walk over HDFS code to verify the pattern 
and it was correct. I want to have two suggestions, (1) since each file in HDFS 
block size is usually 128MB, We could use the mmap mapping via FileChannel 
class to load the file into memory and enable file system prefetching and look 
ahead in background, instead of synchronously reading from disk. The second 
suggestion is to use asynchronous read operations to local disk of the 
datanode. I was wondering if there is a logic behind synchronous reads from the 
file system?

 

Code: 
$HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 line 586
{code:java}
  /**
   * Sends a packet with up to maxChunks chunks of data.
   * 
   * @param pkt buffer used for writing packet data
   * @param maxChunks maximum number of chunks to send
   * @param out stream to send data to
   * @param transferTo use transferTo to send data
   * @param throttler used for throttling data transfer bandwidth
   */
  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
  boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
 (chunkSize * (long) maxChunks));

int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the 
packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
// The packet buffer is organized as follows:
// ___D?D?D?D?
//^   ^
//|   \ checksumOff
//\ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.

int headerLen = writePacketHeader(pkt, dataLen, packetLen);

// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;

int checksumOff = pkt.position();
byte[] buf = pkt.array();

if (checksumSize > 0 && checksumIn != null) {
  readChecksum(buf, checksumOff, checksumDataLen);  // write in 
progress that we need to use to get last checksum
  if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();

if (updatedChecksum != null) {
  System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}
  }
}

int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
  IOUtils.readFully(blockIn, buf, dataOff, dataLen);  
  if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
  }
}

try {
  if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);

// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
  } else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
  }
} catch (IOException e) {
  if (e instanceof SocketTimeoutException) {
/*
 * writing to client timed out.  This happens if the client reads
 * part of a block and then decides not to read the rest (but leaves
 * the socket open).
 * 
 * Reporting of this case is done in DataXceiver#run
 */
  } else {
/* Exception while writing to the client. Connection closure from
 * the other end is mostly the case and we do not care much about
 * it. But other things can go wrong, especially in transferTo(),
 * which we do not 

[jira] [Updated] (HDFS-15206) HDFS synchronous reads from local file system

2020-03-03 Thread Mania Abdi (Jira)


 [ 
https://issues.apache.org/jira/browse/HDFS-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mania Abdi updated HDFS-15206:
--
Description: 
Hello everyone,

I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and 
file1.txt has 1MB size and I capture the workflow of requests using XTrace. By 
evaluating the workflow trace, I noticed that datanode issues 64KB synchronous 
read request to local file system to read the data, and sends the data back and 
waits for completion. I had a code walk over HDFS code to verify the pattern 
and it was correct. I want to have two suggestions, (1) since each file in HDFS 
block size is usually 128MB, We could use the mmap mapping via FileChannel 
class to load the file into memory and enable file system prefetching and look 
ahead in background, instead of synchronously reading from disk. The second 
suggestion is to use asynchronous read operations to local disk of the 
datanode. I was wondering if there is a logic behind synchronous reads from the 
file system?

 

Code: 
$HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 line 586
{code:java}
  /**
   * Sends a packet with up to maxChunks chunks of data.
   * 
   * @param pkt buffer used for writing packet data
   * @param maxChunks maximum number of chunks to send
   * @param out stream to send data to
   * @param transferTo use transferTo to send data
   * @param throttler used for throttling data transfer bandwidth
   */
  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
  boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
 (chunkSize * (long) maxChunks));

int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the 
packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
// The packet buffer is organized as follows:
// ___D?D?D?D?
//^   ^
//|   \ checksumOff
//\ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.

int headerLen = writePacketHeader(pkt, dataLen, packetLen);

// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;

int checksumOff = pkt.position();
byte[] buf = pkt.array();

if (checksumSize > 0 && checksumIn != null) {
  readChecksum(buf, checksumOff, checksumDataLen);  // write in 
progress that we need to use to get last checksum
  if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();

if (updatedChecksum != null) {
  System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}
  }
}

int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
  IOUtils.readFully(blockIn, buf, dataOff, dataLen);  
  if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
  }
}

try {
  if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);

// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
  } else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
  }
} catch (IOException e) {
  if (e instanceof SocketTimeoutException) {
/*
 * writing to client timed out.  This happens if the client reads
 * part of a block and then decides not to read the rest (but leaves
 * the socket open).
 * 
 * Reporting of this case is done in DataXceiver#run
 */
  } else {
/* Exception while writing to the client. Connection closure from
 * the other end is mostly the case and we do not care much about
 * it. But other things can go wrong, especially in transferTo(),
 * which we do not