HDDS-163. Add Datanode heartbeat dispatcher in SCM. Contributed by Nandakumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ddd09d59 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ddd09d59 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ddd09d59 Branch: refs/heads/HDDS-48 Commit: ddd09d59f3d9825f068026622720914e04c2e1d6 Parents: 7547740 Author: Anu Engineer <aengin...@apache.org> Authored: Wed Jun 13 20:18:22 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Wed Jun 13 20:18:22 2018 -0700 ---------------------------------------------------------------------- .../container/common/report/ReportManager.java | 3 +- .../scm/server/SCMDatanodeProtocolServer.java | 119 +++++------- .../SCMDatanodeContainerReportHandler.java | 76 ++++++++ .../report/SCMDatanodeHeartbeatDispatcher.java | 189 +++++++++++++++++++ .../report/SCMDatanodeNodeReportHandler.java | 43 +++++ .../server/report/SCMDatanodeReportHandler.java | 83 ++++++++ .../report/SCMDatanodeReportHandlerFactory.java | 82 ++++++++ .../hdds/scm/server/report/package-info.java | 57 ++++++ .../TestSCMDatanodeContainerReportHandler.java | 34 ++++ .../TestSCMDatanodeHeartbeatDispatcher.java | 138 ++++++++++++++ .../TestSCMDatanodeNodeReportHandler.java | 36 ++++ .../TestSCMDatanodeReportHandlerFactory.java | 51 +++++ .../hdds/scm/server/report/package-info.java | 21 +++ .../apache/hadoop/ozone/scm/TestSCMMetrics.java | 20 +- 14 files changed, 875 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java index c09282e..8097cd6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java @@ -50,7 +50,8 @@ public final class ReportManager { List<ReportPublisher> publishers) { this.context = context; this.publishers = publishers; - this.executorService = HadoopExecutors.newScheduledThreadPool(1, + this.executorService = HadoopExecutors.newScheduledThreadPool( + publishers.size(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Datanode ReportManager Thread - %d").build()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 1b1645d..7d16161 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -69,7 +69,7 @@ import static org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; +import org.apache.hadoop.hdds.scm.server.report.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -114,6 +114,7 @@ public class SCMDatanodeProtocolServer implements private final StorageContainerManager scm; private final InetSocketAddress datanodeRpcAddress; + private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher; public SCMDatanodeProtocolServer(final OzoneConfiguration conf, StorageContainerManager scm) throws IOException { @@ -148,14 +149,22 @@ public class SCMDatanodeProtocolServer implements updateRPCListenAddress( conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); + + heartbeatDispatcher = SCMDatanodeHeartbeatDispatcher.newBuilder(conf, scm) + .addHandlerFor(NodeReportProto.class) + .addHandlerFor(ContainerReportsProto.class) + .build(); } - public InetSocketAddress getDatanodeRpcAddress() { - return datanodeRpcAddress; + public void start() { + LOG.info( + StorageContainerManager.buildRpcServerStartMessage( + "RPC server for DataNodes", datanodeRpcAddress)); + datanodeRpcServer.start(); } - public RPC.Server getDatanodeRpcServer() { - return datanodeRpcServer; + public InetSocketAddress getDatanodeRpcAddress() { + return datanodeRpcAddress; } @Override @@ -167,25 +176,6 @@ public class SCMDatanodeProtocolServer implements } @Override - public SCMHeartbeatResponseProto sendHeartbeat( - SCMHeartbeatRequestProto heartbeat) - throws IOException { - // TODO: Add a heartbeat dispatcher. - DatanodeDetails datanodeDetails = DatanodeDetails - .getFromProtoBuf(heartbeat.getDatanodeDetails()); - NodeReportProto nodeReport = heartbeat.getNodeReport(); - List<SCMCommand> commands = - scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport); - List<SCMCommandProto> cmdResponses = new LinkedList<>(); - for (SCMCommand cmd : commands) { - cmdResponses.add(getCommandResponse(cmd)); - } - return SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID(datanodeDetails.getUuidString()) - .addAllCommands(cmdResponses).build(); - } - - @Override public SCMRegisteredResponseProto register( HddsProtos.DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, @@ -216,36 +206,27 @@ public class SCMDatanodeProtocolServer implements .build(); } - public void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports) + @Override + public SCMHeartbeatResponseProto sendHeartbeat( + SCMHeartbeatRequestProto heartbeat) throws IOException { - updateContainerReportMetrics(datanodeDetails, reports); - // should we process container reports async? - scm.getScmContainerManager() - .processContainerReports(datanodeDetails, reports); - } + heartbeatDispatcher.dispatch(heartbeat); - private void updateContainerReportMetrics(DatanodeDetails datanodeDetails, - ContainerReportsProto reports) { - ContainerStat newStat = new ContainerStat(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports - .getReportsList()) { - newStat.add(new ContainerStat(info.getSize(), info.getUsed(), - info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), - info.getReadCount(), info.getWriteCount())); + // TODO: Remove the below code after SCM refactoring. + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(heartbeat.getDatanodeDetails()); + NodeReportProto nodeReport = heartbeat.getNodeReport(); + List<SCMCommand> commands = + scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport); + List<SCMCommandProto> cmdResponses = new LinkedList<>(); + for (SCMCommand cmd : commands) { + cmdResponses.add(getCommandResponse(cmd)); } - // update container metrics - StorageContainerManager.getMetrics().setLastContainerStat(newStat); - - // Update container stat entry, this will trigger a removal operation if it - // exists in cache. - String datanodeUuid = datanodeDetails.getUuidString(); - scm.getContainerReportCache().put(datanodeUuid, newStat); - // update global view container metrics - StorageContainerManager.getMetrics().incrContainerStat(newStat); + return SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID(datanodeDetails.getUuidString()) + .addAllCommands(cmdResponses).build(); } - @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( ContainerBlocksDeletionACKProto acks) throws IOException { @@ -271,28 +252,6 @@ public class SCMDatanodeProtocolServer implements .getDefaultInstanceForType(); } - public void start() { - LOG.info( - StorageContainerManager.buildRpcServerStartMessage( - "RPC server for DataNodes", getDatanodeRpcAddress())); - getDatanodeRpcServer().start(); - } - - public void stop() { - try { - LOG.info("Stopping the RPC server for DataNodes"); - datanodeRpcServer.stop(); - } catch (Exception ex) { - LOG.error(" datanodeRpcServer stop failed.", ex); - } - IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); - } - - public void join() throws InterruptedException { - LOG.trace("Join RPC server for DataNodes"); - datanodeRpcServer.join(); - } - /** * Returns a SCMCommandRepose from the SCM Command. * @@ -338,4 +297,22 @@ public class SCMDatanodeProtocolServer implements throw new IllegalArgumentException("Not implemented"); } } + + + public void join() throws InterruptedException { + LOG.trace("Join RPC server for DataNodes"); + datanodeRpcServer.join(); + } + + public void stop() { + try { + LOG.info("Stopping the RPC server for DataNodes"); + datanodeRpcServer.stop(); + heartbeatDispatcher.shutdown(); + } catch (Exception ex) { + LOG.error(" datanodeRpcServer stop failed.", ex); + } + IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java new file mode 100644 index 0000000..00ce94d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Handler for Datanode Container Report. + */ +public class SCMDatanodeContainerReportHandler extends + SCMDatanodeReportHandler<ContainerReportsProto> { + + private static final Logger LOG = LoggerFactory.getLogger( + SCMDatanodeContainerReportHandler.class); + + @Override + public void processReport(DatanodeDetails datanodeDetails, + ContainerReportsProto report) throws IOException { + LOG.trace("Processing container report from {}.", datanodeDetails); + updateContainerReportMetrics(datanodeDetails, report); + getSCM().getScmContainerManager() + .processContainerReports(datanodeDetails, report); + } + + /** + * Updates container report metrics in SCM. + * + * @param datanodeDetails Datanode Information + * @param reports Container Reports + */ + private void updateContainerReportMetrics(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) { + ContainerStat newStat = new ContainerStat(); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports + .getReportsList()) { + newStat.add(new ContainerStat(info.getSize(), info.getUsed(), + info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), + info.getReadCount(), info.getWriteCount())); + } + // update container metrics + StorageContainerManager.getMetrics().setLastContainerStat(newStat); + + // Update container stat entry, this will trigger a removal operation if it + // exists in cache. + String datanodeUuid = datanodeDetails.getUuidString(); + getSCM().getContainerReportCache().put(datanodeUuid, newStat); + // update global view container metrics + StorageContainerManager.getMetrics().incrContainerStat(newStat); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java new file mode 100644 index 0000000..d50edff --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * This class is responsible for dispatching heartbeat from datanode to + * appropriate ReportHandlers at SCM. + * Only one handler per report is supported now, it's very easy to support + * multiple handlers for a report. + */ +public final class SCMDatanodeHeartbeatDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger( + SCMDatanodeHeartbeatDispatcher.class); + + /** + * This stores Report to Handler mapping. + */ + private final Map<Class<? extends GeneratedMessage>, + SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers; + + /** + * Executor service which will be used for processing reports. + */ + private final ExecutorService executorService; + + /** + * Constructs SCMDatanodeHeartbeatDispatcher instance with the given + * handlers. + * + * @param handlers report to report handler mapping + */ + private SCMDatanodeHeartbeatDispatcher(Map<Class<? extends GeneratedMessage>, + SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers) { + this.handlers = handlers; + this.executorService = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("SCMDatanode Heartbeat Dispatcher Thread - %d") + .build()); + } + + /** + * Dispatches heartbeat to registered handlers. + * + * @param heartbeat heartbeat to be dispatched. + */ + public void dispatch(SCMHeartbeatRequestProto heartbeat) { + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(heartbeat.getDatanodeDetails()); + if (heartbeat.hasNodeReport()) { + processReport(datanodeDetails, heartbeat.getNodeReport()); + } + if (heartbeat.hasContainerReport()) { + processReport(datanodeDetails, heartbeat.getContainerReport()); + } + } + + /** + * Invokes appropriate ReportHandler and submits the task to executor + * service for processing. + * + * @param datanodeDetails Datanode Information + * @param report Report to be processed + */ + @SuppressWarnings("unchecked") + private void processReport(DatanodeDetails datanodeDetails, + GeneratedMessage report) { + executorService.submit(() -> { + try { + SCMDatanodeReportHandler handler = handlers.get(report.getClass()); + handler.processReport(datanodeDetails, report); + } catch (IOException ex) { + LOG.error("Exception wile processing report {}, from {}", + report.getClass(), datanodeDetails, ex); + } + }); + } + + /** + * Shuts down SCMDatanodeHeartbeatDispatcher. + */ + public void shutdown() { + executorService.shutdown(); + } + + /** + * Returns a new Builder to construct {@link SCMDatanodeHeartbeatDispatcher}. + * + * @param conf Configuration to be used by SCMDatanodeHeartbeatDispatcher + * @param scm {@link StorageContainerManager} instance to be used by report + * handlers + * + * @return {@link SCMDatanodeHeartbeatDispatcher.Builder} instance + */ + public static Builder newBuilder(Configuration conf, + StorageContainerManager scm) { + return new Builder(conf, scm); + } + + /** + * Builder for SCMDatanodeHeartbeatDispatcher. + */ + public static class Builder { + + private final SCMDatanodeReportHandlerFactory reportHandlerFactory; + private final Map<Class<? extends GeneratedMessage>, + SCMDatanodeReportHandler<? extends GeneratedMessage>> report2handler; + + /** + * Constructs SCMDatanodeHeartbeatDispatcher.Builder instance. + * + * @param conf Configuration object to be used. + * @param scm StorageContainerManager instance to be used for report + * handler initialization. + */ + private Builder(Configuration conf, StorageContainerManager scm) { + this.report2handler = new HashMap<>(); + this.reportHandlerFactory = + new SCMDatanodeReportHandlerFactory(conf, scm); + } + + /** + * Adds new report handler for the given report. + * + * @param report Report for which handler has to be added + * + * @return Builder + */ + public Builder addHandlerFor(Class<? extends GeneratedMessage> report) { + report2handler.put(report, reportHandlerFactory.getHandlerFor(report)); + return this; + } + + /** + * Associates the given report handler for the given report. + * + * @param report Report to be associated with + * @param handler Handler to be used for the report + * + * @return Builder + */ + public Builder addHandler(Class<? extends GeneratedMessage> report, + SCMDatanodeReportHandler<? extends GeneratedMessage> handler) { + report2handler.put(report, handler); + return this; + } + + /** + * Builds and returns {@link SCMDatanodeHeartbeatDispatcher} instance. + * + * @return SCMDatanodeHeartbeatDispatcher + */ + public SCMDatanodeHeartbeatDispatcher build() { + return new SCMDatanodeHeartbeatDispatcher(report2handler); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java new file mode 100644 index 0000000..fb89b02 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Handles Datanode Node Report. + */ +public class SCMDatanodeNodeReportHandler extends + SCMDatanodeReportHandler<NodeReportProto> { + + private static final Logger LOG = LoggerFactory.getLogger( + SCMDatanodeNodeReportHandler.class); + + @Override + public void processReport(DatanodeDetails datanodeDetails, + NodeReportProto report) throws IOException { + LOG.debug("Processing node report from {}.", datanodeDetails); + //TODO: add logic to process node report. + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java new file mode 100644 index 0000000..d338649 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; + +import java.io.IOException; + +/** + * Datanode Report handlers should implement this interface in order to get + * call back whenever the report is received from datanode. + * + * @param <T> Type of report the handler is interested in. + */ +public abstract class SCMDatanodeReportHandler<T extends GeneratedMessage> + implements Configurable { + + private Configuration config; + private StorageContainerManager scm; + + /** + * Initializes SCMDatanodeReportHandler and associates it with the given + * StorageContainerManager instance. + * + * @param storageContainerManager StorageContainerManager instance to be + * associated with. + */ + public void init(StorageContainerManager storageContainerManager) { + this.scm = storageContainerManager; + } + + /** + * Returns the associated StorageContainerManager instance. This will be + * used by the ReportHandler implementations. + * + * @return {@link StorageContainerManager} + */ + protected StorageContainerManager getSCM() { + return scm; + } + + @Override + public void setConf(Configuration conf) { + this.config = conf; + } + + @Override + public Configuration getConf() { + return config; + } + + /** + * Processes the report received from datanode. Each ReportHandler + * implementation is responsible for providing the logic to process the + * report it's interested in. + * + * @param datanodeDetails Datanode Information + * @param report Report to be processed + * + * @throws IOException In case of any exception + */ + abstract void processReport(DatanodeDetails datanodeDetails, T report) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java new file mode 100644 index 0000000..e88495f --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.util.ReflectionUtils; + +import java.util.HashMap; +import java.util.Map; + + +/** + * Factory class to construct {@link SCMDatanodeReportHandler} given a report. + */ +public class SCMDatanodeReportHandlerFactory { + + private final Configuration conf; + private final StorageContainerManager scm; + private final Map<Class<? extends GeneratedMessage>, + Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>> + report2handler; + + /** + * Constructs {@link SCMDatanodeReportHandler} instance. + * + * @param conf Configuration to be passed to the + * {@link SCMDatanodeReportHandler} + */ + public SCMDatanodeReportHandlerFactory(Configuration conf, + StorageContainerManager scm) { + this.conf = conf; + this.scm = scm; + this.report2handler = new HashMap<>(); + + report2handler.put(NodeReportProto.class, + SCMDatanodeNodeReportHandler.class); + report2handler.put(ContainerReportsProto.class, + SCMDatanodeContainerReportHandler.class); + } + + /** + * Returns the SCMDatanodeReportHandler for the corresponding report. + * + * @param report report + * + * @return report handler + */ + public SCMDatanodeReportHandler<? extends GeneratedMessage> getHandlerFor( + Class<? extends GeneratedMessage> report) { + Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>> + handlerClass = report2handler.get(report); + if (handlerClass == null) { + throw new RuntimeException("No handler found for report " + report); + } + SCMDatanodeReportHandler<? extends GeneratedMessage> instance = + ReflectionUtils.newInstance(handlerClass, conf); + instance.init(scm); + return instance; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java new file mode 100644 index 0000000..fda3993 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.server.report; +/** + * Handling of all the datanode reports in SCM which are received through + * heartbeat is done here. + * + * SCM Datanode Report Processing State Diagram: + * + * SCMDatanode SCMDatanodeHeartbeat SCMDatanodeReport + * ProtocolServer Dispatcher Handler + * | | | + * | | | + * | construct | | + * |----------------------->| | + * | | | + * | | register | + * | |<-----------------------| + * | | | + * +------------+------------------------+------------------------+--------+ + * | loop | | | | + * | | | | | + * | | | | | + * | heartbeat | | | | + * - +----------->| | | | + * | from | heartbeat | | | + * | Datanode |----------------------->| | | + * | | | report | | + * | | |----------------------->| | + * | | | | | + * | DN | | | | + * <-+------------| | | | + * | commands | | | | + * | | | | | + * +------------+------------------------+------------------------+--------+ + * | | | + * | | | + * | shutdown | | + * |----------------------->| | + * | | | + * | | | + * - - - + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java new file mode 100644 index 0000000..776ae88 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases to verify SCMDatanodeContainerReportHandler's behavior. + */ +public class TestSCMDatanodeContainerReportHandler { + + //TODO: add test cases to verify SCMDatanodeContainerReportHandler. + + @Test + public void dummyTest() { + Assert.assertTrue(true); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java new file mode 100644 index 0000000..5d08647 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * This class tests the behavior of SCMDatanodeHeartbeatDispatcher. + */ +public class TestSCMDatanodeHeartbeatDispatcher { + + @Test + public void testSCMDatanodeHeartbeatDispatcherBuilder() { + Configuration conf = new OzoneConfiguration(); + SCMDatanodeHeartbeatDispatcher dispatcher = + SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) + .addHandlerFor(NodeReportProto.class) + .addHandlerFor(ContainerReportsProto.class) + .build(); + Assert.assertNotNull(dispatcher); + } + + @Test + public void testNodeReportDispatcher() throws IOException { + Configuration conf = new OzoneConfiguration(); + SCMDatanodeNodeReportHandler nodeReportHandler = + Mockito.mock(SCMDatanodeNodeReportHandler.class); + SCMDatanodeHeartbeatDispatcher dispatcher = + SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) + .addHandler(NodeReportProto.class, nodeReportHandler) + .build(); + + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); + SCMHeartbeatRequestProto heartbeat = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setNodeReport(nodeReport) + .build(); + dispatcher.dispatch(heartbeat); + verify(nodeReportHandler, + times(1)) + .processReport(any(DatanodeDetails.class), eq(nodeReport)); + } + + @Test + public void testContainerReportDispatcher() throws IOException { + Configuration conf = new OzoneConfiguration(); + SCMDatanodeContainerReportHandler containerReportHandler = + Mockito.mock(SCMDatanodeContainerReportHandler.class); + SCMDatanodeHeartbeatDispatcher dispatcher = + SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) + .addHandler(ContainerReportsProto.class, containerReportHandler) + .build(); + + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + ContainerReportsProto containerReport = + ContainerReportsProto.getDefaultInstance(); + SCMHeartbeatRequestProto heartbeat = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setContainerReport(containerReport) + .build(); + dispatcher.dispatch(heartbeat); + verify(containerReportHandler, + times(1)) + .processReport(any(DatanodeDetails.class), + any(ContainerReportsProto.class)); + } + + @Test + public void testNodeAndContainerReportDispatcher() throws IOException { + Configuration conf = new OzoneConfiguration(); + SCMDatanodeNodeReportHandler nodeReportHandler = + Mockito.mock(SCMDatanodeNodeReportHandler.class); + SCMDatanodeContainerReportHandler containerReportHandler = + Mockito.mock(SCMDatanodeContainerReportHandler.class); + SCMDatanodeHeartbeatDispatcher dispatcher = + SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) + .addHandler(NodeReportProto.class, nodeReportHandler) + .addHandler(ContainerReportsProto.class, containerReportHandler) + .build(); + + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); + ContainerReportsProto containerReport = + ContainerReportsProto.getDefaultInstance(); + SCMHeartbeatRequestProto heartbeat = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setNodeReport(nodeReport) + .setContainerReport(containerReport) + .build(); + dispatcher.dispatch(heartbeat); + verify(nodeReportHandler, + times(1)) + .processReport(any(DatanodeDetails.class), any(NodeReportProto.class)); + verify(containerReportHandler, + times(1)) + .processReport(any(DatanodeDetails.class), + any(ContainerReportsProto.class)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java new file mode 100644 index 0000000..30a753c --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases to verify TestSCMDatanodeNodeReportHandler's behavior. + */ +public class TestSCMDatanodeNodeReportHandler { + + + //TODO: add test cases to verify SCMDatanodeNodeReportHandler. + + @Test + public void dummyTest() { + Assert.assertTrue(true); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java new file mode 100644 index 0000000..4b918f7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases to verify the functionality of SCMDatanodeReportHandlerFactory. + */ +public class TestSCMDatanodeReportHandlerFactory { + + @Test + public void testNodeReportHandlerConstruction() { + Configuration conf = new OzoneConfiguration(); + SCMDatanodeReportHandlerFactory factory = + new SCMDatanodeReportHandlerFactory(conf, null); + Assert.assertTrue(factory.getHandlerFor(NodeReportProto.class) + instanceof SCMDatanodeNodeReportHandler); + } + + @Test + public void testContainerReporttHandlerConstruction() { + Configuration conf = new OzoneConfiguration(); + SCMDatanodeReportHandlerFactory factory = + new SCMDatanodeReportHandlerFactory(conf, null); + Assert.assertTrue(factory.getHandlerFor(ContainerReportsProto.class) + instanceof SCMDatanodeContainerReportHandler); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java new file mode 100644 index 0000000..4a3f59f --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.report; +/** + * Contains test-cases to test Datanode report handlers in SCM. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java index 1dbe760..ecddf8e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.report + .SCMDatanodeContainerReportHandler; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; @@ -80,7 +82,11 @@ public class TestSCMMetrics { DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails(); ContainerReportsProto request = createContainerReport(numReport, stat); String fstDatanodeUuid = fstDatanodeDetails.getUuidString(); - scmManager.getDatanodeProtocolServer().processContainerReports( + SCMDatanodeContainerReportHandler containerReportHandler = + new SCMDatanodeContainerReportHandler(); + containerReportHandler.setConf(conf); + containerReportHandler.init(scmManager); + containerReportHandler.processReport( fstDatanodeDetails, request); // verify container stat metrics @@ -105,7 +111,7 @@ public class TestSCMMetrics { DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails(); request = createContainerReport(1, stat); String sndDatanodeUuid = sndDatanodeDetails.getUuidString(); - scmManager.getDatanodeProtocolServer().processContainerReports( + containerReportHandler.processReport( sndDatanodeDetails, request); scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); @@ -128,11 +134,11 @@ public class TestSCMMetrics { // Re-send reports but with different value for validating // the aggregation. stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6); - scmManager.getDatanodeProtocolServer().processContainerReports( + containerReportHandler.processReport( fstDatanodeDetails, createContainerReport(1, stat)); stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1); - scmManager.getDatanodeProtocolServer().processContainerReports( + containerReportHandler.processReport( sndDatanodeDetails, createContainerReport(1, stat)); // the global container metrics value should be updated @@ -176,8 +182,12 @@ public class TestSCMMetrics { DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0) .getDatanodeDetails(); + SCMDatanodeContainerReportHandler containerReportHandler = + new SCMDatanodeContainerReportHandler(); + containerReportHandler.setConf(conf); + containerReportHandler.init(scmManager); ContainerReportsProto request = createContainerReport(numReport, stat); - scmManager.getDatanodeProtocolServer().processContainerReports( + containerReportHandler.processReport( datanodeDetails, request); MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org