HDDS-226. Client should update block length in OM while committing the key. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4db753b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4db753b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4db753b Branch: refs/heads/YARN-7402 Commit: f4db753bb6b4648c583722dbe8108973c23ba06f Parents: 6310c0d Author: Mukul Kumar Singh <msi...@apache.org> Authored: Wed Aug 1 09:02:43 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Wed Aug 1 09:03:00 2018 +0530 ---------------------------------------------------------------------- .../ozone/client/io/ChunkGroupOutputStream.java | 22 +++++++++++- .../hadoop/ozone/om/helpers/OmKeyArgs.java | 26 ++++++++++++--- .../hadoop/ozone/om/helpers/OmKeyInfo.java | 29 ++++++++++++++-- .../ozone/om/helpers/OmKeyLocationInfo.java | 6 +++- ...neManagerProtocolClientSideTranslatorPB.java | 8 ++++- .../src/main/proto/OzoneManagerProtocol.proto | 1 + .../ozone/client/rpc/TestOzoneRpcClient.java | 35 ++++++++++++++++++++ .../hadoop/ozone/om/TestOmBlockVersioning.java | 13 +++++++- .../apache/hadoop/ozone/om/KeyManagerImpl.java | 4 +++ ...neManagerProtocolServerSideTranslatorPB.java | 5 ++- 10 files changed, 138 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 9443317..83b4dfd 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -76,7 +76,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final int chunkSize; private final String requestID; private boolean closed; - + private List<OmKeyLocationInfo> locationInfoList; /** * A constructor for testing purpose only. */ @@ -91,6 +91,7 @@ public class ChunkGroupOutputStream extends OutputStream { chunkSize = 0; requestID = null; closed = false; + locationInfoList = null; } /** @@ -133,6 +134,7 @@ public class ChunkGroupOutputStream extends OutputStream { this.xceiverClientManager = xceiverClientManager; this.chunkSize = chunkSize; this.requestID = requestId; + this.locationInfoList = new ArrayList<>(); LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); } @@ -196,8 +198,19 @@ public class ChunkGroupOutputStream extends OutputStream { streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, chunkSize, subKeyInfo.getLength())); + // reset the original length to zero here. It will be updated as and when + // the data gets written. + subKeyInfo.setLength(0); + locationInfoList.add(subKeyInfo); } + private void incrementBlockLength(int index, long length) { + if (locationInfoList != null) { + OmKeyLocationInfo locationInfo = locationInfoList.get(index); + long originalLength = locationInfo.getLength(); + locationInfo.setLength(originalLength + length); + } + } @VisibleForTesting public long getByteOffset() { @@ -222,6 +235,7 @@ public class ChunkGroupOutputStream extends OutputStream { } ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex); entry.write(b); + incrementBlockLength(currentStreamIndex, 1); if (entry.getRemaining() <= 0) { currentStreamIndex += 1; } @@ -276,6 +290,7 @@ public class ChunkGroupOutputStream extends OutputStream { ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); int writeLen = Math.min(len, (int)current.getRemaining()); current.write(b, off, writeLen); + incrementBlockLength(currentStreamIndex, writeLen); if (current.getRemaining() <= 0) { currentStreamIndex += 1; } @@ -328,8 +343,13 @@ public class ChunkGroupOutputStream extends OutputStream { } if (keyArgs != null) { // in test, this could be null + long length = + locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum(); + Preconditions.checkState(byteOffset == length); keyArgs.setDataSize(byteOffset); + keyArgs.setLocationInfoList(locationInfoList); omClient.commitKey(keyArgs, openID); + locationInfoList = null; } else { LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index 1f8ed5f..aab35c5 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.om.helpers; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import java.util.List; + /** * Args for key. Client use this to specify key's attributes on key creation * (putKey()). @@ -30,15 +32,18 @@ public final class OmKeyArgs { private long dataSize; private final ReplicationType type; private final ReplicationFactor factor; + private List<OmKeyLocationInfo> locationInfoList; private OmKeyArgs(String volumeName, String bucketName, String keyName, - long dataSize, ReplicationType type, ReplicationFactor factor) { + long dataSize, ReplicationType type, ReplicationFactor factor, + List<OmKeyLocationInfo> locationInfoList) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; this.dataSize = dataSize; this.type = type; this.factor = factor; + this.locationInfoList = locationInfoList; } public ReplicationType getType() { @@ -69,6 +74,14 @@ public final class OmKeyArgs { dataSize = size; } + public void setLocationInfoList(List<OmKeyLocationInfo> locationInfoList) { + this.locationInfoList = locationInfoList; + } + + public List<OmKeyLocationInfo> getLocationInfoList() { + return locationInfoList; + } + /** * Builder class of OmKeyArgs. */ @@ -79,7 +92,7 @@ public final class OmKeyArgs { private long dataSize; private ReplicationType type; private ReplicationFactor factor; - + private List<OmKeyLocationInfo> locationInfoList; public Builder setVolumeName(String volume) { this.volumeName = volume; @@ -111,9 +124,14 @@ public final class OmKeyArgs { return this; } + public Builder setLocationInfoList(List<OmKeyLocationInfo> locationInfos) { + this.locationInfoList = locationInfos; + return this; + } + public OmKeyArgs build() { - return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, - type, factor); + return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type, + factor, locationInfoList); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 05c8d45..3603964 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -101,8 +101,7 @@ public final class OmKeyInfo { this.dataSize = size; } - public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() - throws IOException { + public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() { return keyLocationVersions.size() == 0? null : keyLocationVersions.get(keyLocationVersions.size() - 1); } @@ -116,6 +115,32 @@ public final class OmKeyInfo { } /** + * updates the length of the each block in the list given. + * This will be called when the key is being committed to OzoneManager. + * + * @param locationInfoList list of locationInfo + * @throws IOException + */ + public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) { + OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); + List<OmKeyLocationInfo> currentList = + keyLocationInfoGroup.getLocationList(); + Preconditions.checkNotNull(keyLocationInfoGroup); + Preconditions.checkState(locationInfoList.size() <= currentList.size()); + for (OmKeyLocationInfo current : currentList) { + // For Versioning, while committing the key for the newer version, + // we just need to update the lengths for new blocks. Need to iterate over + // and find the new blocks added in the latest version. + for (OmKeyLocationInfo info : locationInfoList) { + if (info.getBlockID().equals(current.getBlockID())) { + current.setLength(info.getLength()); + break; + } + } + } + } + + /** * Append a set of blocks to the latest version. Note that these blocks are * part of the latest version, not a new version. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 3f6666d..fae92f8 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -27,7 +27,7 @@ public final class OmKeyLocationInfo { private final BlockID blockID; private final boolean shouldCreateContainer; // the id of this subkey in all the subkeys. - private final long length; + private long length; private final long offset; // the version number indicating when this block was added private long createVersion; @@ -68,6 +68,10 @@ public final class OmKeyLocationInfo { return length; } + public void setLength(long length) { + this.length = length; + } + public long getOffset() { return offset; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 37151fb..e557ac5 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.om.protocolPB; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.protobuf.RpcController; @@ -581,11 +582,16 @@ public final class OzoneManagerProtocolClientSideTranslatorPB public void commitKey(OmKeyArgs args, int clientID) throws IOException { CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder(); + List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList(); + Preconditions.checkNotNull(locationInfoList); KeyArgs keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) .setKeyName(args.getKeyName()) - .setDataSize(args.getDataSize()).build(); + .setDataSize(args.getDataSize()) + .addAllKeyLocations( + locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf) + .collect(Collectors.toList())).build(); req.setKeyArgs(keyArgs); req.setClientID(clientID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 36b1c83..51a0a7f 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -234,6 +234,7 @@ message KeyArgs { optional uint64 dataSize = 4; optional hadoop.hdds.ReplicationType type = 5; optional hadoop.hdds.ReplicationFactor factor = 6; + repeated KeyLocation keyLocations = 7; } message KeyLocation { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 2fbab36..e31b528 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -434,6 +435,40 @@ public class TestOzoneRpcClient { } @Test + public void testValidateBlockLengthWithCommitKey() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = RandomStringUtils.random(RandomUtils.nextInt(0,1024)); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + String keyName = UUID.randomUUID().toString(); + + // create the initial key with size 0, write will allocate the first block. + OzoneOutputStream out = bucket.createKey(keyName, 0, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + out.write(value.getBytes()); + out.close(); + OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); + builder.setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName); + OmKeyInfo keyInfo = ozoneManager.lookupKey(builder.build()); + + List<OmKeyLocationInfo> locationInfoList = + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); + // LocationList should have only 1 block + Assert.assertEquals(1, locationInfoList.size()); + // make sure the data block size is updated + Assert.assertEquals(value.getBytes().length, + locationInfoList.get(0).getLength()); + // make sure the total data size is set correctly + Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize()); + } + + + @Test public void testPutKeyRatisOneNode() throws IOException, OzoneException { String volumeName = UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java index 15122b9..f5dddee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java @@ -44,6 +44,7 @@ import org.junit.rules.ExpectedException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -122,6 +123,9 @@ public class TestOmBlockVersioning { // 1st update, version 0 OpenKeySession openKey = ozoneManager.openKey(keyArgs); + // explicitly set the keyLocation list before committing the key. + keyArgs.setLocationInfoList( + openKey.getKeyInfo().getLatestVersionLocations().getLocationList()); ozoneManager.commitKey(keyArgs, openKey.getId()); OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs); @@ -134,6 +138,9 @@ public class TestOmBlockVersioning { openKey = ozoneManager.openKey(keyArgs); //OmKeyLocationInfo locationInfo = // ozoneManager.allocateBlock(keyArgs, openKey.getId()); + // explicitly set the keyLocation list before committing the key. + keyArgs.setLocationInfoList( + openKey.getKeyInfo().getLatestVersionLocations().getLocationList()); ozoneManager.commitKey(keyArgs, openKey.getId()); keyInfo = ozoneManager.lookupKey(keyArgs); @@ -144,7 +151,11 @@ public class TestOmBlockVersioning { // 3rd update, version 2 openKey = ozoneManager.openKey(keyArgs); // this block will be appended to the latest version of version 2. - ozoneManager.allocateBlock(keyArgs, openKey.getId()); + OmKeyLocationInfo locationInfo = + ozoneManager.allocateBlock(keyArgs, openKey.getId()); + List<OmKeyLocationInfo> locationInfoList = new ArrayList<>(); + locationInfoList.add(locationInfo); + keyArgs.setLocationInfoList(locationInfoList); ozoneManager.commitKey(keyArgs, openKey.getId()); keyInfo = ozoneManager.lookupKey(keyArgs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index ba92a29..75342c6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -342,6 +342,10 @@ public class KeyManagerImpl implements KeyManager { OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData)); keyInfo.setDataSize(args.getDataSize()); keyInfo.setModificationTime(Time.now()); + List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList(); + Preconditions.checkNotNull(locationInfoList); + //update the block length for each block + keyInfo.updateLocationInfoList(locationInfoList); BatchOperation batch = new BatchOperation(); batch.delete(openKey); batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 40a88b6..45ec2d0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -519,9 +519,12 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements .setVolumeName(keyArgs.getVolumeName()) .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) - .setDataSize(keyArgs.getDataSize()) + .setLocationInfoList(keyArgs.getKeyLocationsList().stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList())) .setType(type) .setFactor(factor) + .setDataSize(keyArgs.getDataSize()) .build(); int id = request.getClientID(); impl.commitKey(omKeyArgs, id); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org