HDDS-771. ChunkGroupOutputStream stream entries need to be properly updated on closed container exception. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0ac3081 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0ac3081 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0ac3081 Branch: refs/heads/HDDS-4 Commit: e0ac3081e95bc70b13c36a2cad1565ecc35dec52 Parents: 2e8ac14 Author: Shashikant Banerjee <shashik...@apache.org> Authored: Thu Nov 1 15:43:48 2018 +0530 Committer: Shashikant Banerjee <shashik...@apache.org> Committed: Thu Nov 1 15:43:48 2018 +0530 ---------------------------------------------------------------------- .../ozone/client/io/ChunkGroupOutputStream.java | 6 +- .../rpc/TestCloseContainerHandlingByClient.java | 60 ++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0ac3081/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 78d69c1..3fe5d93 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -413,6 +413,11 @@ public class ChunkGroupOutputStream extends OutputStream { return; } + // update currentStreamIndex in case of closed container exception. The + // current stream entry cannot be used for further writes because + // container is closed. + currentStreamIndex += 1; + // In case where not a single chunk of data has been written to the Datanode // yet. This block does not yet exist on the datanode but cached on the // outputStream buffer. No need to call GetCommittedBlockLength here @@ -429,7 +434,6 @@ public class ChunkGroupOutputStream extends OutputStream { // allocate new block and write this data in the datanode. The cached // data in the buffer does not exceed chunkSize. Preconditions.checkState(buffer.position() < chunkSize); - currentStreamIndex += 1; // readjust the byteOffset value to the length actually been written. byteOffset -= buffer.position(); handleWrite(buffer.array(), 0, buffer.position()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0ac3081/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index d06a0bc..c6ee872 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; @@ -287,6 +289,64 @@ public class TestCloseContainerHandlingByClient { validateData(keyName, dataString.concat(dataString2).getBytes()); } + @Test + public void testMultiBlockWrites3() throws Exception { + + String keyName = "standalone5"; + int keyLen = 4 * blockSize; + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, keyLen); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + // With the initial size provided, it should have preallocated 4 blocks + Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + // write data 3 blocks and one more chunk + byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes(); + byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize); + Assert.assertEquals(data.length, 3 * blockSize + chunkSize); + key.write(data); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(keyName, key, + HddsProtos.ReplicationType.RATIS); + // write 3 more chunks worth of data. It will fail and new block will be + // allocated. This write completes 4 blocks worth of data written to key + data = Arrays + .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); + key.write(data); + + key.close(); + // read the key from OM again and match the length and data. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List<OmKeyLocationInfo> keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + OzoneVolume volume = objectStore.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + OzoneInputStream inputStream = bucket.readKey(keyName); + byte[] readData = new byte[keyLen]; + inputStream.read(readData); + Assert.assertArrayEquals(writtenData, readData); + + // Though we have written only block initially, the close will hit + // closeContainerException and remaining data in the chunkOutputStream + // buffer will be copied into a different allocated block and will be + // committed. + Assert.assertEquals(5, keyLocationInfos.size()); + Assert.assertEquals(4 * blockSize, keyInfo.getDataSize()); + long length = 0; + for (OmKeyLocationInfo locationInfo : keyLocationInfos) { + length += locationInfo.getLength(); + } + Assert.assertEquals(4 * blockSize, length); + } + private void waitForContainerClose(String keyName, OzoneOutputStream outputStream, HddsProtos.ReplicationType type) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org