HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream. Contributed by Yi Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/254759df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/254759df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/254759df Branch: refs/heads/HDFS-7285 Commit: 254759dfdb59e0aa235fb3dac4be92d6baaea466 Parents: cdd1a78 Author: Jing Zhao <ji...@apache.org> Authored: Thu Apr 23 15:48:21 2015 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Mon May 4 10:13:29 2015 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/fs/FSOutputSummer.java | 4 ++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +++ .../org/apache/hadoop/hdfs/DFSStripedOutputStream.java | 10 ++++++++++ 3 files changed, 17 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/254759df/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java index bdc5585..a8a7494 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java @@ -196,6 +196,10 @@ abstract public class FSOutputSummer extends OutputStream { return sum.getChecksumSize(); } + protected DataChecksum getDataChecksum() { + return sum; + } + protected TraceScope createWriteTraceScope() { return NullScope.INSTANCE; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/254759df/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 48791b1..9357e23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -125,3 +125,6 @@ HDFS-8233. Fix DFSStripedOutputStream#getCurrentBlockGroupBytes when the last stripe is at the block group boundary. (jing9) + + HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream. + (Yi Liu via jing9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/254759df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 245dfc1..6842267 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -62,6 +62,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { */ private final ECInfo ecInfo; private final int cellSize; + // checksum buffer, we only need to calculate checksum for parity blocks + private byte[] checksumBuf; private ByteBuffer[] cellBuffers; private final short numAllBlocks; @@ -99,6 +101,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { checkConfiguration(); + checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)]; cellBuffers = new ByteBuffer[numAllBlocks]; List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>(); @@ -179,6 +182,10 @@ public class DFSStripedOutputStream extends DFSOutputStream { private List<DFSPacket> generatePackets(ByteBuffer byteBuffer) throws IOException{ List<DFSPacket> packets = new ArrayList<>(); + assert byteBuffer.hasArray(); + getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0, + byteBuffer.remaining(), checksumBuf, 0); + int ckOff = 0; while (byteBuffer.remaining() > 0) { DFSPacket p = createPacket(packetSize, chunksPerPacket, streamer.getBytesCurBlock(), @@ -186,6 +193,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum; int toWrite = byteBuffer.remaining() > maxBytesToPacket ? maxBytesToPacket: byteBuffer.remaining(); + int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * getChecksumSize(); + p.writeChecksum(checksumBuf, ckOff, ckLen); + ckOff += ckLen; p.writeData(byteBuffer, toWrite); streamer.incBytesCurBlock(toWrite); packets.add(p);