This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 706fe1b  HDDS-3168. Improve read efficiency by merging a lot of RPC 
call getContainerWithPipeline into one. (#692)
706fe1b is described below

commit 706fe1be4af499b9a2779ca1129f2d5ab0f168a9
Author: runzhiwang <[email protected]>
AuthorDate: Wed Apr 22 13:46:49 2020 +0800

    HDDS-3168. Improve read efficiency by merging a lot of RPC call 
getContainerWithPipeline into one. (#692)
---
 .../protocol/StorageContainerLocationProtocol.java |  12 +++
 ...inerLocationProtocolClientSideTranslatorPB.java |  35 +++++++
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   3 +-
 .../proto/StorageContainerLocationProtocol.proto   |  12 +++
 ...inerLocationProtocolServerSideTranslatorPB.java |  23 +++++
 .../hdds/scm/server/SCMClientProtocolServer.java   | 108 ++++++++++++++-------
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  37 ++++---
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  70 +++++++------
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  13 ++-
 9 files changed, 233 insertions(+), 80 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 3ec3277..dff739a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -77,6 +77,18 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
       throws IOException;
 
   /**
+   * Ask SCM the location of a batch of containers. SCM responds with a group 
of
+   * nodes where these containers and their replicas are located.
+   *
+   * @param containerIDs - IDs of a batch of containers.
+   * @return List of ContainerWithPipeline
+   * - the container info with the pipeline.
+   * @throws IOException
+   */
+  List<ContainerWithPipeline> getContainerWithPipelineBatch(
+      List<Long> containerIDs) throws IOException;
+
+  /**
    * Ask SCM a list of containers with a range of container names
    * and the limit of count.
    * Search container names between start name(exclusive), and
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index dffae11..b0040cf 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
@@ -197,6 +198,40 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
   /**
    * {@inheritDoc}
    */
+  public List<ContainerWithPipeline> getContainerWithPipelineBatch(
+      List<Long> containerIDs) throws IOException {
+    for (Long containerID: containerIDs) {
+      Preconditions.checkState(containerID >= 0,
+          "Container ID cannot be negative");
+    }
+
+    GetContainerWithPipelineBatchRequestProto request =
+        GetContainerWithPipelineBatchRequestProto.newBuilder()
+            .setTraceID(TracingUtil.exportCurrentSpan())
+            .addAllContainerIDs(containerIDs)
+            .build();
+
+    ScmContainerLocationResponse response =
+        submitRequest(Type.GetContainerWithPipelineBatch,
+            (builder) -> builder
+                .setGetContainerWithPipelineBatchRequest(request));
+
+    List<HddsProtos.ContainerWithPipeline> protoCps = response
+        .getGetContainerWithPipelineBatchResponse()
+        .getContainerWithPipelinesList();
+
+    List<ContainerWithPipeline> cps = new ArrayList<>();
+
+    for (HddsProtos.ContainerWithPipeline cp : protoCps) {
+      cps.add(ContainerWithPipeline.fromProtobuf(cp));
+    }
+
+    return cps;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public List<ContainerInfo> listContainer(long startContainerID, int count)
       throws IOException {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index fada2d8..102d47a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -43,7 +43,8 @@ public enum SCMAction implements AuditAction {
   SORT_DATANODE,
   START_REPLICATION_MANAGER,
   STOP_REPLICATION_MANAGER,
-  GET_REPLICATION_MANAGER_STATUS;
+  GET_REPLICATION_MANAGER_STATUS,
+  GET_CONTAINER_WITH_PIPELINE_BATCH;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto 
b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 88df770..2bb9990 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -60,6 +60,7 @@ message ScmContainerLocationRequest {
   optional StopReplicationManagerRequestProto stopReplicationManagerRequest = 
22;
   optional ReplicationManagerStatusRequestProto 
seplicationManagerStatusRequest = 23;
   optional GetPipelineRequestProto getPipelineRequest = 24;
+  optional GetContainerWithPipelineBatchRequestProto 
getContainerWithPipelineBatchRequest = 25;
 
 }
 
@@ -93,6 +94,7 @@ message ScmContainerLocationResponse {
   optional StopReplicationManagerResponseProto stopReplicationManagerResponse 
= 22;
   optional ReplicationManagerStatusResponseProto 
replicationManagerStatusResponse = 23;
   optional GetPipelineResponseProto getPipelineResponse = 24;
+  optional GetContainerWithPipelineBatchResponseProto 
getContainerWithPipelineBatchResponse = 25;
   enum Status {
     OK = 1;
     CONTAINER_ALREADY_EXISTS = 2;
@@ -121,6 +123,7 @@ enum Type {
   StopReplicationManager = 17;
   GetReplicationManagerStatus = 18;
   GetPipeline = 19;
+  GetContainerWithPipelineBatch = 20;
 }
 
 /**
@@ -168,6 +171,15 @@ message GetContainerWithPipelineResponseProto {
   required ContainerWithPipeline containerWithPipeline = 1;
 }
 
+message GetContainerWithPipelineBatchRequestProto {
+  repeated int64 containerIDs = 1;
+  optional string traceID = 2;
+}
+
+message GetContainerWithPipelineBatchResponseProto {
+  repeated ContainerWithPipeline containerWithPipelines = 1;
+}
+
 message SCMListContainerRequestProto {
   required uint32 count = 1;
   optional uint64 startContainerID = 2;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 6207343..bc2208d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -38,6 +38,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
@@ -146,6 +148,14 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
             .setGetContainerWithPipelineResponse(getContainerWithPipeline(
                 request.getGetContainerWithPipelineRequest()))
             .build();
+      case GetContainerWithPipelineBatch:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setGetContainerWithPipelineBatchResponse(
+                getContainerWithPipelineBatch(
+                    request.getGetContainerWithPipelineBatchRequest()))
+            .build();
       case ListContainer:
         return ScmContainerLocationResponse.newBuilder()
             .setCmdType(request.getCmdType())
@@ -289,6 +299,19 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
         .build();
   }
 
+  public GetContainerWithPipelineBatchResponseProto
+      getContainerWithPipelineBatch(
+      GetContainerWithPipelineBatchRequestProto request) throws IOException {
+    List<ContainerWithPipeline> containers = impl
+        .getContainerWithPipelineBatch(request.getContainerIDsList());
+    GetContainerWithPipelineBatchResponseProto.Builder builder =
+        GetContainerWithPipelineBatchResponseProto.newBuilder();
+    for (ContainerWithPipeline container : containers) {
+      builder.addContainerWithPipelines(container.getProtobuf());
+    }
+    return builder.build();
+  }
+
   public SCMListContainerResponseProto listContainer(
       SCMListContainerRequestProto request) throws IOException {
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 95ab809..3286468 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -219,57 +219,93 @@ public class SCMClientProtocolServer implements
 
   }
 
-  @Override
-  public ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException {
+  private ContainerWithPipeline getContainerWithPipelineCommon(
+      long containerID) throws IOException {
     final ContainerID cid = ContainerID.valueof(containerID);
-    try {
-      final ContainerInfo container = scm.getContainerManager()
-          .getContainer(cid);
-
-      if (safeModePrecheck.isInSafeMode()) {
-        if (container.isOpen()) {
-          if (!hasRequiredReplicas(container)) {
-            throw new SCMException("Open container " + containerID + " doesn't"
-                + " have enough replicas to service this operation in "
-                + "Safe mode.", ResultCodes.SAFE_MODE_EXCEPTION);
-          }
+    final ContainerInfo container = scm.getContainerManager()
+        .getContainer(cid);
+
+    if (safeModePrecheck.isInSafeMode()) {
+      if (container.isOpen()) {
+        if (!hasRequiredReplicas(container)) {
+          throw new SCMException("Open container " + containerID + " doesn't"
+              + " have enough replicas to service this operation in "
+              + "Safe mode.", ResultCodes.SAFE_MODE_EXCEPTION);
         }
       }
-      getScm().checkAdminAccess(null);
+    }
 
-      Pipeline pipeline;
-      try {
-        pipeline = container.isOpen() ? scm.getPipelineManager()
-            .getPipeline(container.getPipelineID()) : null;
-      } catch (PipelineNotFoundException ex) {
-        // The pipeline is destroyed.
-        pipeline = null;
-      }
+    Pipeline pipeline;
+    try {
+      pipeline = container.isOpen() ? scm.getPipelineManager()
+          .getPipeline(container.getPipelineID()) : null;
+    } catch (PipelineNotFoundException ex) {
+      // The pipeline is destroyed.
+      pipeline = null;
+    }
 
-      if (pipeline == null) {
-        pipeline = scm.getPipelineManager().createPipeline(
-            HddsProtos.ReplicationType.STAND_ALONE,
-            container.getReplicationFactor(),
-            scm.getContainerManager()
-                .getContainerReplicas(cid).stream()
-                .map(ContainerReplica::getDatanodeDetails)
-                .collect(Collectors.toList()));
-      }
+    if (pipeline == null) {
+      pipeline = scm.getPipelineManager().createPipeline(
+          HddsProtos.ReplicationType.STAND_ALONE,
+          container.getReplicationFactor(),
+          scm.getContainerManager()
+              .getContainerReplicas(cid).stream()
+              .map(ContainerReplica::getDatanodeDetails)
+              .collect(Collectors.toList()));
+    }
 
+    return new ContainerWithPipeline(container, pipeline);
+  }
+
+  @Override
+  public ContainerWithPipeline getContainerWithPipeline(long containerID)
+      throws IOException {
+    getScm().checkAdminAccess(null);
+
+    try {
+      ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
       AUDIT.logReadSuccess(buildAuditMessageForSuccess(
           SCMAction.GET_CONTAINER_WITH_PIPELINE,
-          Collections.singletonMap("containerID", cid.toString())));
-
-      return new ContainerWithPipeline(container, pipeline);
+          Collections.singletonMap("containerID",
+          ContainerID.valueof(containerID).toString())));
+      return cp;
     } catch (IOException ex) {
       AUDIT.logReadFailure(buildAuditMessageForFailure(
           SCMAction.GET_CONTAINER_WITH_PIPELINE,
-          Collections.singletonMap("containerID", cid.toString()), ex));
+          Collections.singletonMap("containerID",
+              ContainerID.valueof(containerID).toString()), ex));
       throw ex;
     }
   }
 
+  @Override
+  public List<ContainerWithPipeline> getContainerWithPipelineBatch(
+      List<Long> containerIDs) throws IOException {
+    getScm().checkAdminAccess(null);
+
+    List<ContainerWithPipeline> cpList = new ArrayList<>();
+
+    for (Long containerID : containerIDs) {
+      try {
+        ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
+        cpList.add(cp);
+      } catch (IOException ex) {
+        AUDIT.logReadFailure(buildAuditMessageForFailure(
+            SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
+            Collections.singletonMap("containerID",
+                ContainerID.valueof(containerID).toString()), ex));
+        throw ex;
+      }
+    }
+
+    AUDIT.logReadSuccess(buildAuditMessageForSuccess(
+        SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
+        Collections.singletonMap("containerIDs",
+        containerIDs.stream().map(id -> ContainerID.valueof(id).toString())
+            .collect(Collectors.joining(",")))));
+
+    return cpList;
+  }
   /**
    * Check if container reported replicas are equal or greater than required
    * replication factor.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 6e23f34..7f609d9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -115,7 +116,7 @@ import static 
org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.SCM_GET_PIPELINE_EXCEPTION;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyList;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -1114,12 +1115,27 @@ public class TestKeyManagerImpl {
 
       StorageContainerLocationProtocol sclProtocolMock = mock(
           StorageContainerLocationProtocol.class);
-      ContainerWithPipeline containerWithPipelineMock =
-          mock(ContainerWithPipeline.class);
-      when(containerWithPipelineMock.getPipeline())
-          .thenReturn(getRandomPipeline());
-      when(sclProtocolMock.getContainerWithPipeline(anyLong()))
-          .thenReturn(containerWithPipelineMock);
+
+      List<Long> containerIDs = new ArrayList<>();
+      containerIDs.add(100L);
+      containerIDs.add(200L);
+
+      List<ContainerWithPipeline> cps = new ArrayList<>();
+      for (Long containerID : containerIDs) {
+        ContainerWithPipeline containerWithPipelineMock =
+            mock(ContainerWithPipeline.class);
+        when(containerWithPipelineMock.getPipeline())
+            .thenReturn(getRandomPipeline());
+
+        ContainerInfo ci = mock(ContainerInfo.class);
+        when(ci.getContainerID()).thenReturn(containerID);
+        when(containerWithPipelineMock.getContainerInfo()).thenReturn(ci);
+
+        cps.add(containerWithPipelineMock);
+      }
+
+      when(sclProtocolMock.getContainerWithPipelineBatch(containerIDs))
+          .thenReturn(cps);
 
       ScmClient scmClientMock = mock(ScmClient.class);
       when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
@@ -1158,9 +1174,8 @@ public class TestKeyManagerImpl {
 
       keyManagerImpl.refreshPipeline(omKeyInfo);
 
-      verify(sclProtocolMock, times(2)).getContainerWithPipeline(anyLong());
-      verify(sclProtocolMock, times(1)).getContainerWithPipeline(100L);
-      verify(sclProtocolMock, times(1)).getContainerWithPipeline(200L);
+      verify(sclProtocolMock, times(1))
+          .getContainerWithPipelineBatch(containerIDs);
     } finally {
       cluster.shutdown();
     }
@@ -1178,7 +1193,7 @@ public class TestKeyManagerImpl {
       StorageContainerLocationProtocol sclProtocolMock = mock(
           StorageContainerLocationProtocol.class);
       doThrow(new IOException(errorMessage)).when(sclProtocolMock)
-          .getContainerWithPipeline(anyLong());
+          .getContainerWithPipelineBatch(anyList());
 
       ScmClient scmClientMock = mock(ScmClient.class);
       when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d03fbd9..d0377b5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -687,36 +688,45 @@ public class KeyManagerImpl implements KeyManager {
    */
   @VisibleForTesting
   protected void refreshPipeline(OmKeyInfo value) throws IOException {
-    if (value != null &&
-        CollectionUtils.isNotEmpty(value.getKeyLocationVersions())) {
-      Map<Long, ContainerWithPipeline> containerWithPipelineMap =
-          new HashMap<>();
-      for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
-        for (OmKeyLocationInfo k : key.getLocationList()) {
-          // TODO: fix Some tests that may not initialize container client
-          // The production should always have containerClient initialized.
-          if (scmClient.getContainerClient() != null) {
-            try {
-              if (!containerWithPipelineMap.containsKey(k.getContainerID())) {
-                ContainerWithPipeline containerWithPipeline = scmClient
-                    .getContainerClient()
-                    .getContainerWithPipeline(k.getContainerID());
-                containerWithPipelineMap.put(k.getContainerID(),
-                    containerWithPipeline);
-              }
-            } catch (IOException ioEx) {
-              LOG.debug("Get containerPipeline failed for volume:{} bucket:{} "
-                      + "key:{}", value.getVolumeName(), value.getBucketName(),
-                  value.getKeyName(), ioEx);
-              throw new OMException(ioEx.getMessage(),
-                  SCM_GET_PIPELINE_EXCEPTION);
-            }
-            ContainerWithPipeline cp =
-                containerWithPipelineMap.get(k.getContainerID());
-            if (!cp.getPipeline().equals(k.getPipeline())) {
-              k.setPipeline(cp.getPipeline());
-            }
-          }
+    final List<OmKeyLocationInfoGroup> locationInfoGroups = value == null ?
+        null : value.getKeyLocationVersions();
+
+    // TODO: fix Some tests that may not initialize container client
+    // The production should always have containerClient initialized.
+    if (scmClient.getContainerClient() == null ||
+        CollectionUtils.isEmpty(locationInfoGroups)) {
+      return;
+    }
+
+    Set<Long> containerIDs = new HashSet<>();
+    for (OmKeyLocationInfoGroup key : locationInfoGroups) {
+      for (OmKeyLocationInfo k : key.getLocationList()) {
+        containerIDs.add(k.getContainerID());
+      }
+    }
+
+    Map<Long, ContainerWithPipeline> containerWithPipelineMap = new 
HashMap<>();
+
+    try {
+      List<ContainerWithPipeline> cpList = scmClient.getContainerClient().
+          getContainerWithPipelineBatch(new ArrayList<>(containerIDs));
+      for (ContainerWithPipeline cp : cpList) {
+        containerWithPipelineMap.put(
+            cp.getContainerInfo().getContainerID(), cp);
+      }
+    } catch (IOException ioEx) {
+      LOG.debug("Get containerPipeline failed for volume:{} bucket:{} " +
+          "key:{}", value.getVolumeName(), value.getBucketName(),
+          value.getKeyName(), ioEx);
+      throw new OMException(ioEx.getMessage(), SCM_GET_PIPELINE_EXCEPTION);
+    }
+
+    for (OmKeyLocationInfoGroup key : locationInfoGroups) {
+      for (OmKeyLocationInfo k : key.getLocationList()) {
+        ContainerWithPipeline cp =
+            containerWithPipelineMap.get(k.getContainerID());
+        if (!cp.getPipeline().equals(k.getPipeline())) {
+          k.setPipeline(cp.getPipeline());
         }
       }
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 320c3a5..4e62eb8 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -353,8 +354,16 @@ public class TestKeyManagerUnit {
         .setNodes(Arrays.asList(dnFour, dnFive, dnSix))
         .build();
 
-    Mockito.when(containerClient.getContainerWithPipeline(1L))
-        .thenReturn(new ContainerWithPipeline(null, pipelineTwo));
+    List<Long> containerIDs = new ArrayList<>();
+    containerIDs.add(1L);
+
+    List<ContainerWithPipeline> cps = new ArrayList<>();
+    ContainerInfo ci = Mockito.mock(ContainerInfo.class);
+    Mockito.when(ci.getContainerID()).thenReturn(1L);
+    cps.add(new ContainerWithPipeline(ci, pipelineTwo));
+
+    Mockito.when(containerClient.getContainerWithPipelineBatch(containerIDs))
+        .thenReturn(cps);
 
     final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
         .setVolume("volumeOne")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to