This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 706fe1b HDDS-3168. Improve read efficiency by merging a lot of RPC
call getContainerWithPipeline into one. (#692)
706fe1b is described below
commit 706fe1be4af499b9a2779ca1129f2d5ab0f168a9
Author: runzhiwang <[email protected]>
AuthorDate: Wed Apr 22 13:46:49 2020 +0800
HDDS-3168. Improve read efficiency by merging a lot of RPC call
getContainerWithPipeline into one. (#692)
---
.../protocol/StorageContainerLocationProtocol.java | 12 +++
...inerLocationProtocolClientSideTranslatorPB.java | 35 +++++++
.../org/apache/hadoop/ozone/audit/SCMAction.java | 3 +-
.../proto/StorageContainerLocationProtocol.proto | 12 +++
...inerLocationProtocolServerSideTranslatorPB.java | 23 +++++
.../hdds/scm/server/SCMClientProtocolServer.java | 108 ++++++++++++++-------
.../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 37 ++++---
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 70 +++++++------
.../apache/hadoop/ozone/om/TestKeyManagerUnit.java | 13 ++-
9 files changed, 233 insertions(+), 80 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 3ec3277..dff739a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -77,6 +77,18 @@ public interface StorageContainerLocationProtocol extends
Closeable {
throws IOException;
/**
+ * Ask SCM the location of a batch of containers. SCM responds with a group
of
+ * nodes where these containers and their replicas are located.
+ *
+ * @param containerIDs - IDs of a batch of containers.
+ * @return List of ContainerWithPipeline
+ * - the container info with the pipeline.
+ * @throws IOException
+ */
+ List<ContainerWithPipeline> getContainerWithPipelineBatch(
+ List<Long> containerIDs) throws IOException;
+
+ /**
* Ask SCM a list of containers with a range of container names
* and the limit of count.
* Search container names between start name(exclusive), and
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index dffae11..b0040cf 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -35,6 +35,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
@@ -197,6 +198,40 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
/**
* {@inheritDoc}
*/
+ public List<ContainerWithPipeline> getContainerWithPipelineBatch(
+ List<Long> containerIDs) throws IOException {
+ for (Long containerID: containerIDs) {
+ Preconditions.checkState(containerID >= 0,
+ "Container ID cannot be negative");
+ }
+
+ GetContainerWithPipelineBatchRequestProto request =
+ GetContainerWithPipelineBatchRequestProto.newBuilder()
+ .setTraceID(TracingUtil.exportCurrentSpan())
+ .addAllContainerIDs(containerIDs)
+ .build();
+
+ ScmContainerLocationResponse response =
+ submitRequest(Type.GetContainerWithPipelineBatch,
+ (builder) -> builder
+ .setGetContainerWithPipelineBatchRequest(request));
+
+ List<HddsProtos.ContainerWithPipeline> protoCps = response
+ .getGetContainerWithPipelineBatchResponse()
+ .getContainerWithPipelinesList();
+
+ List<ContainerWithPipeline> cps = new ArrayList<>();
+
+ for (HddsProtos.ContainerWithPipeline cp : protoCps) {
+ cps.add(ContainerWithPipeline.fromProtobuf(cp));
+ }
+
+ return cps;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public List<ContainerInfo> listContainer(long startContainerID, int count)
throws IOException {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index fada2d8..102d47a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -43,7 +43,8 @@ public enum SCMAction implements AuditAction {
SORT_DATANODE,
START_REPLICATION_MANAGER,
STOP_REPLICATION_MANAGER,
- GET_REPLICATION_MANAGER_STATUS;
+ GET_REPLICATION_MANAGER_STATUS,
+ GET_CONTAINER_WITH_PIPELINE_BATCH;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 88df770..2bb9990 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -60,6 +60,7 @@ message ScmContainerLocationRequest {
optional StopReplicationManagerRequestProto stopReplicationManagerRequest =
22;
optional ReplicationManagerStatusRequestProto
seplicationManagerStatusRequest = 23;
optional GetPipelineRequestProto getPipelineRequest = 24;
+ optional GetContainerWithPipelineBatchRequestProto
getContainerWithPipelineBatchRequest = 25;
}
@@ -93,6 +94,7 @@ message ScmContainerLocationResponse {
optional StopReplicationManagerResponseProto stopReplicationManagerResponse
= 22;
optional ReplicationManagerStatusResponseProto
replicationManagerStatusResponse = 23;
optional GetPipelineResponseProto getPipelineResponse = 24;
+ optional GetContainerWithPipelineBatchResponseProto
getContainerWithPipelineBatchResponse = 25;
enum Status {
OK = 1;
CONTAINER_ALREADY_EXISTS = 2;
@@ -121,6 +123,7 @@ enum Type {
StopReplicationManager = 17;
GetReplicationManagerStatus = 18;
GetPipeline = 19;
+ GetContainerWithPipelineBatch = 20;
}
/**
@@ -168,6 +171,15 @@ message GetContainerWithPipelineResponseProto {
required ContainerWithPipeline containerWithPipeline = 1;
}
+message GetContainerWithPipelineBatchRequestProto {
+ repeated int64 containerIDs = 1;
+ optional string traceID = 2;
+}
+
+message GetContainerWithPipelineBatchResponseProto {
+ repeated ContainerWithPipeline containerWithPipelines = 1;
+}
+
message SCMListContainerRequestProto {
required uint32 count = 1;
optional uint64 startContainerID = 2;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 6207343..bc2208d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -38,6 +38,8 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
@@ -146,6 +148,14 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.setGetContainerWithPipelineResponse(getContainerWithPipeline(
request.getGetContainerWithPipelineRequest()))
.build();
+ case GetContainerWithPipelineBatch:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setGetContainerWithPipelineBatchResponse(
+ getContainerWithPipelineBatch(
+ request.getGetContainerWithPipelineBatchRequest()))
+ .build();
case ListContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
@@ -289,6 +299,19 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.build();
}
+ public GetContainerWithPipelineBatchResponseProto
+ getContainerWithPipelineBatch(
+ GetContainerWithPipelineBatchRequestProto request) throws IOException {
+ List<ContainerWithPipeline> containers = impl
+ .getContainerWithPipelineBatch(request.getContainerIDsList());
+ GetContainerWithPipelineBatchResponseProto.Builder builder =
+ GetContainerWithPipelineBatchResponseProto.newBuilder();
+ for (ContainerWithPipeline container : containers) {
+ builder.addContainerWithPipelines(container.getProtobuf());
+ }
+ return builder.build();
+ }
+
public SCMListContainerResponseProto listContainer(
SCMListContainerRequestProto request) throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 95ab809..3286468 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -219,57 +219,93 @@ public class SCMClientProtocolServer implements
}
- @Override
- public ContainerWithPipeline getContainerWithPipeline(long containerID)
- throws IOException {
+ private ContainerWithPipeline getContainerWithPipelineCommon(
+ long containerID) throws IOException {
final ContainerID cid = ContainerID.valueof(containerID);
- try {
- final ContainerInfo container = scm.getContainerManager()
- .getContainer(cid);
-
- if (safeModePrecheck.isInSafeMode()) {
- if (container.isOpen()) {
- if (!hasRequiredReplicas(container)) {
- throw new SCMException("Open container " + containerID + " doesn't"
- + " have enough replicas to service this operation in "
- + "Safe mode.", ResultCodes.SAFE_MODE_EXCEPTION);
- }
+ final ContainerInfo container = scm.getContainerManager()
+ .getContainer(cid);
+
+ if (safeModePrecheck.isInSafeMode()) {
+ if (container.isOpen()) {
+ if (!hasRequiredReplicas(container)) {
+ throw new SCMException("Open container " + containerID + " doesn't"
+ + " have enough replicas to service this operation in "
+ + "Safe mode.", ResultCodes.SAFE_MODE_EXCEPTION);
}
}
- getScm().checkAdminAccess(null);
+ }
- Pipeline pipeline;
- try {
- pipeline = container.isOpen() ? scm.getPipelineManager()
- .getPipeline(container.getPipelineID()) : null;
- } catch (PipelineNotFoundException ex) {
- // The pipeline is destroyed.
- pipeline = null;
- }
+ Pipeline pipeline;
+ try {
+ pipeline = container.isOpen() ? scm.getPipelineManager()
+ .getPipeline(container.getPipelineID()) : null;
+ } catch (PipelineNotFoundException ex) {
+ // The pipeline is destroyed.
+ pipeline = null;
+ }
- if (pipeline == null) {
- pipeline = scm.getPipelineManager().createPipeline(
- HddsProtos.ReplicationType.STAND_ALONE,
- container.getReplicationFactor(),
- scm.getContainerManager()
- .getContainerReplicas(cid).stream()
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList()));
- }
+ if (pipeline == null) {
+ pipeline = scm.getPipelineManager().createPipeline(
+ HddsProtos.ReplicationType.STAND_ALONE,
+ container.getReplicationFactor(),
+ scm.getContainerManager()
+ .getContainerReplicas(cid).stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList()));
+ }
+ return new ContainerWithPipeline(container, pipeline);
+ }
+
+ @Override
+ public ContainerWithPipeline getContainerWithPipeline(long containerID)
+ throws IOException {
+ getScm().checkAdminAccess(null);
+
+ try {
+ ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.GET_CONTAINER_WITH_PIPELINE,
- Collections.singletonMap("containerID", cid.toString())));
-
- return new ContainerWithPipeline(container, pipeline);
+ Collections.singletonMap("containerID",
+ ContainerID.valueof(containerID).toString())));
+ return cp;
} catch (IOException ex) {
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.GET_CONTAINER_WITH_PIPELINE,
- Collections.singletonMap("containerID", cid.toString()), ex));
+ Collections.singletonMap("containerID",
+ ContainerID.valueof(containerID).toString()), ex));
throw ex;
}
}
+ @Override
+ public List<ContainerWithPipeline> getContainerWithPipelineBatch(
+ List<Long> containerIDs) throws IOException {
+ getScm().checkAdminAccess(null);
+
+ List<ContainerWithPipeline> cpList = new ArrayList<>();
+
+ for (Long containerID : containerIDs) {
+ try {
+ ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
+ cpList.add(cp);
+ } catch (IOException ex) {
+ AUDIT.logReadFailure(buildAuditMessageForFailure(
+ SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
+ Collections.singletonMap("containerID",
+ ContainerID.valueof(containerID).toString()), ex));
+ throw ex;
+ }
+ }
+
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(
+ SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
+ Collections.singletonMap("containerIDs",
+ containerIDs.stream().map(id -> ContainerID.valueof(id).toString())
+ .collect(Collectors.joining(",")))));
+
+ return cpList;
+ }
/**
* Check if container reported replicas are equal or greater than required
* replication factor.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 6e23f34..7f609d9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -115,7 +116,7 @@ import static
org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.SCM_GET_PIPELINE_EXCEPTION;
import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyList;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -1114,12 +1115,27 @@ public class TestKeyManagerImpl {
StorageContainerLocationProtocol sclProtocolMock = mock(
StorageContainerLocationProtocol.class);
- ContainerWithPipeline containerWithPipelineMock =
- mock(ContainerWithPipeline.class);
- when(containerWithPipelineMock.getPipeline())
- .thenReturn(getRandomPipeline());
- when(sclProtocolMock.getContainerWithPipeline(anyLong()))
- .thenReturn(containerWithPipelineMock);
+
+ List<Long> containerIDs = new ArrayList<>();
+ containerIDs.add(100L);
+ containerIDs.add(200L);
+
+ List<ContainerWithPipeline> cps = new ArrayList<>();
+ for (Long containerID : containerIDs) {
+ ContainerWithPipeline containerWithPipelineMock =
+ mock(ContainerWithPipeline.class);
+ when(containerWithPipelineMock.getPipeline())
+ .thenReturn(getRandomPipeline());
+
+ ContainerInfo ci = mock(ContainerInfo.class);
+ when(ci.getContainerID()).thenReturn(containerID);
+ when(containerWithPipelineMock.getContainerInfo()).thenReturn(ci);
+
+ cps.add(containerWithPipelineMock);
+ }
+
+ when(sclProtocolMock.getContainerWithPipelineBatch(containerIDs))
+ .thenReturn(cps);
ScmClient scmClientMock = mock(ScmClient.class);
when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
@@ -1158,9 +1174,8 @@ public class TestKeyManagerImpl {
keyManagerImpl.refreshPipeline(omKeyInfo);
- verify(sclProtocolMock, times(2)).getContainerWithPipeline(anyLong());
- verify(sclProtocolMock, times(1)).getContainerWithPipeline(100L);
- verify(sclProtocolMock, times(1)).getContainerWithPipeline(200L);
+ verify(sclProtocolMock, times(1))
+ .getContainerWithPipelineBatch(containerIDs);
} finally {
cluster.shutdown();
}
@@ -1178,7 +1193,7 @@ public class TestKeyManagerImpl {
StorageContainerLocationProtocol sclProtocolMock = mock(
StorageContainerLocationProtocol.class);
doThrow(new IOException(errorMessage)).when(sclProtocolMock)
- .getContainerWithPipeline(anyLong());
+ .getContainerWithPipelineBatch(anyList());
ScmClient scmClientMock = mock(ScmClient.class);
when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d03fbd9..d0377b5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -687,36 +688,45 @@ public class KeyManagerImpl implements KeyManager {
*/
@VisibleForTesting
protected void refreshPipeline(OmKeyInfo value) throws IOException {
- if (value != null &&
- CollectionUtils.isNotEmpty(value.getKeyLocationVersions())) {
- Map<Long, ContainerWithPipeline> containerWithPipelineMap =
- new HashMap<>();
- for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
- for (OmKeyLocationInfo k : key.getLocationList()) {
- // TODO: fix Some tests that may not initialize container client
- // The production should always have containerClient initialized.
- if (scmClient.getContainerClient() != null) {
- try {
- if (!containerWithPipelineMap.containsKey(k.getContainerID())) {
- ContainerWithPipeline containerWithPipeline = scmClient
- .getContainerClient()
- .getContainerWithPipeline(k.getContainerID());
- containerWithPipelineMap.put(k.getContainerID(),
- containerWithPipeline);
- }
- } catch (IOException ioEx) {
- LOG.debug("Get containerPipeline failed for volume:{} bucket:{} "
- + "key:{}", value.getVolumeName(), value.getBucketName(),
- value.getKeyName(), ioEx);
- throw new OMException(ioEx.getMessage(),
- SCM_GET_PIPELINE_EXCEPTION);
- }
- ContainerWithPipeline cp =
- containerWithPipelineMap.get(k.getContainerID());
- if (!cp.getPipeline().equals(k.getPipeline())) {
- k.setPipeline(cp.getPipeline());
- }
- }
+ final List<OmKeyLocationInfoGroup> locationInfoGroups = value == null ?
+ null : value.getKeyLocationVersions();
+
+ // TODO: fix Some tests that may not initialize container client
+ // The production should always have containerClient initialized.
+ if (scmClient.getContainerClient() == null ||
+ CollectionUtils.isEmpty(locationInfoGroups)) {
+ return;
+ }
+
+ Set<Long> containerIDs = new HashSet<>();
+ for (OmKeyLocationInfoGroup key : locationInfoGroups) {
+ for (OmKeyLocationInfo k : key.getLocationList()) {
+ containerIDs.add(k.getContainerID());
+ }
+ }
+
+ Map<Long, ContainerWithPipeline> containerWithPipelineMap = new
HashMap<>();
+
+ try {
+ List<ContainerWithPipeline> cpList = scmClient.getContainerClient().
+ getContainerWithPipelineBatch(new ArrayList<>(containerIDs));
+ for (ContainerWithPipeline cp : cpList) {
+ containerWithPipelineMap.put(
+ cp.getContainerInfo().getContainerID(), cp);
+ }
+ } catch (IOException ioEx) {
+ LOG.debug("Get containerPipeline failed for volume:{} bucket:{} " +
+ "key:{}", value.getVolumeName(), value.getBucketName(),
+ value.getKeyName(), ioEx);
+ throw new OMException(ioEx.getMessage(), SCM_GET_PIPELINE_EXCEPTION);
+ }
+
+ for (OmKeyLocationInfoGroup key : locationInfoGroups) {
+ for (OmKeyLocationInfo k : key.getLocationList()) {
+ ContainerWithPipeline cp =
+ containerWithPipelineMap.get(k.getContainerID());
+ if (!cp.getPipeline().equals(k.getPipeline())) {
+ k.setPipeline(cp.getPipeline());
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 320c3a5..4e62eb8 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -353,8 +354,16 @@ public class TestKeyManagerUnit {
.setNodes(Arrays.asList(dnFour, dnFive, dnSix))
.build();
- Mockito.when(containerClient.getContainerWithPipeline(1L))
- .thenReturn(new ContainerWithPipeline(null, pipelineTwo));
+ List<Long> containerIDs = new ArrayList<>();
+ containerIDs.add(1L);
+
+ List<ContainerWithPipeline> cps = new ArrayList<>();
+ ContainerInfo ci = Mockito.mock(ContainerInfo.class);
+ Mockito.when(ci.getContainerID()).thenReturn(1L);
+ cps.add(new ContainerWithPipeline(ci, pipelineTwo));
+
+ Mockito.when(containerClient.getContainerWithPipelineBatch(containerIDs))
+ .thenReturn(cps);
final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
.setVolume("volumeOne")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]