HDDS-771. ChunkGroupOutputStream stream entries need to be properly updated on 
closed container exception. Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0ac3081
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0ac3081
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0ac3081

Branch: refs/heads/HDDS-4
Commit: e0ac3081e95bc70b13c36a2cad1565ecc35dec52
Parents: 2e8ac14
Author: Shashikant Banerjee <shashik...@apache.org>
Authored: Thu Nov 1 15:43:48 2018 +0530
Committer: Shashikant Banerjee <shashik...@apache.org>
Committed: Thu Nov 1 15:43:48 2018 +0530

----------------------------------------------------------------------
 .../ozone/client/io/ChunkGroupOutputStream.java |  6 +-
 .../rpc/TestCloseContainerHandlingByClient.java | 60 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0ac3081/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 78d69c1..3fe5d93 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -413,6 +413,11 @@ public class ChunkGroupOutputStream extends OutputStream {
       return;
     }
 
+    // update currentStreamIndex in case of closed container exception. The
+    // current stream entry cannot be used for further writes because
+    // container is closed.
+    currentStreamIndex += 1;
+
     // In case where not a single chunk of data has been written to the 
Datanode
     // yet. This block does not yet exist on the datanode but cached on the
     // outputStream buffer. No need to call GetCommittedBlockLength here
@@ -429,7 +434,6 @@ public class ChunkGroupOutputStream extends OutputStream {
       // allocate new block and write this data in the datanode. The cached
       // data in the buffer does not exceed chunkSize.
       Preconditions.checkState(buffer.position() < chunkSize);
-      currentStreamIndex += 1;
       // readjust the byteOffset value to the length actually been written.
       byteOffset -= buffer.position();
       handleWrite(buffer.array(), 0, buffer.position());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0ac3081/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index d06a0bc..c6ee872 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -287,6 +289,64 @@ public class TestCloseContainerHandlingByClient {
     validateData(keyName, dataString.concat(dataString2).getBytes());
   }
 
+  @Test
+  public void testMultiBlockWrites3() throws Exception {
+
+    String keyName = "standalone5";
+    int keyLen = 4 * blockSize;
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, keyLen);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    // With the initial size provided, it should have preallocated 4 blocks
+    Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+    // write data 3 blocks and one more chunk
+    byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes();
+    byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + 
chunkSize);
+    Assert.assertEquals(data.length, 3 * blockSize + chunkSize);
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(keyName, key,
+        HddsProtos.ReplicationType.RATIS);
+    // write 3 more chunks worth of data. It will fail and new block will be
+    // allocated. This write completes 4 blocks worth of data written to key
+    data = Arrays
+        .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
+    key.write(data);
+
+    key.close();
+    // read the key from OM again and match the length and data.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    OzoneInputStream inputStream = bucket.readKey(keyName);
+    byte[] readData = new byte[keyLen];
+    inputStream.read(readData);
+    Assert.assertArrayEquals(writtenData, readData);
+
+    // Though we have written only block initially, the close will hit
+    // closeContainerException and remaining data in the chunkOutputStream
+    // buffer will be copied into a different allocated block and will be
+    // committed.
+    Assert.assertEquals(5, keyLocationInfos.size());
+    Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
+    long length = 0;
+    for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
+      length += locationInfo.getLength();
+    }
+    Assert.assertEquals(4 * blockSize, length);
+  }
+
   private void waitForContainerClose(String keyName,
       OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to