HDDS-737. Introduce Incremental Container Report. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c80f753b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c80f753b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c80f753b Branch: refs/heads/trunk Commit: c80f753b0e95eb722a972f836c1e4d16fd823434 Parents: e1bbf7d Author: Nanda kumar <na...@apache.org> Authored: Thu Nov 8 18:33:38 2018 +0530 Committer: Nanda kumar <na...@apache.org> Committed: Thu Nov 8 18:33:38 2018 +0530 ---------------------------------------------------------------------- .../hdds/scm/container/ContainerInfo.java | 8 + .../statemachine/DatanodeStateMachine.java | 17 +- .../common/statemachine/StateContext.java | 13 +- .../CloseContainerCommandHandler.java | 73 +++-- .../states/endpoint/HeartbeatEndpointTask.java | 12 +- .../StorageContainerDatanodeProtocol.proto | 2 +- .../scm/command/CommandStatusReportHandler.java | 4 - .../container/CloseContainerEventHandler.java | 27 -- .../scm/container/CloseContainerWatcher.java | 101 ------- .../hdds/scm/container/ContainerManager.java | 11 - .../hdds/scm/container/ContainerReplica.java | 30 +- .../scm/container/ContainerReportHandler.java | 202 +++++++++---- .../IncrementalContainerReportHandler.java | 98 +++++++ .../hdds/scm/container/SCMContainerManager.java | 107 ------- .../scm/container/states/ContainerStateMap.java | 7 - .../hadoop/hdds/scm/events/SCMEvents.java | 34 +-- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 37 ++- .../hadoop/hdds/scm/node/NewNodeHandler.java | 16 +- .../hadoop/hdds/scm/node/NodeManager.java | 50 +--- .../hadoop/hdds/scm/node/NodeReportHandler.java | 2 +- .../hadoop/hdds/scm/node/NodeStateManager.java | 69 +---- .../hadoop/hdds/scm/node/SCMNodeManager.java | 66 +---- .../hadoop/hdds/scm/node/StaleNodeHandler.java | 2 +- .../hdds/scm/node/states/NodeStateMap.java | 67 ++--- .../server/SCMDatanodeHeartbeatDispatcher.java | 17 +- .../scm/server/SCMDatanodeProtocolServer.java | 6 +- .../scm/server/StorageContainerManager.java | 21 +- .../command/TestCommandStatusReportHandler.java | 14 - .../hdds/scm/container/MockNodeManager.java | 66 ++--- .../container/TestContainerReportHandler.java | 15 +- .../container/TestContainerStateManager.java | 3 + .../scm/container/TestSCMContainerManager.java | 107 +------ .../replication/TestReplicationManager.java | 12 +- .../hdds/scm/node/TestDeadNodeHandler.java | 24 +- .../container/TestCloseContainerWatcher.java | 289 ------------------- .../ozone/container/common/TestEndPoint.java | 6 +- .../testutils/ReplicationNodeManagerMock.java | 55 +--- .../TestContainerStateManagerIntegration.java | 11 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 4 +- .../hdds/scm/pipeline/TestPipelineClose.java | 4 +- .../commandhandler/TestBlockDeletion.java | 6 +- 41 files changed, 558 insertions(+), 1157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java index 5a9484a..edfa0f9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -158,10 +158,18 @@ public class ContainerInfo implements Comparator<ContainerInfo>, return usedBytes; } + public void setUsedBytes(long value) { + usedBytes = value; + } + public long getNumberOfKeys() { return numberOfKeys; } + public void setNumberOfKeys(long value) { + numberOfKeys = value; + } + public long getDeleteTransactionId() { return deleteTransactionId; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 4768cf8..12c33ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -168,10 +168,14 @@ public class DatanodeStateMachine implements Closeable { TimeUnit.MILLISECONDS); now = Time.monotonicNow(); if (now < nextHB.get()) { - Thread.sleep(nextHB.get() - now); + if(!Thread.interrupted()) { + Thread.sleep(nextHB.get() - now); + } } } catch (InterruptedException e) { - // Ignore this exception. + // Some one has sent interrupt signal, this could be because + // 1. Trigger heartbeat immediately + // 2. Shutdown has be initiated. } catch (Exception e) { LOG.error("Unable to finish the execution.", e); } @@ -325,6 +329,15 @@ public class DatanodeStateMachine implements Closeable { } /** + * Calling this will immediately trigger a heartbeat to the SCMs. + * This heartbeat will also include all the reports which are ready to + * be sent by datanode. + */ + public void triggerHeartbeat() { + stateMachineThread.interrupt(); + } + + /** * Waits for DatanodeStateMachine to exit. * * @throws InterruptedException http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 12c196b..e928824 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -447,6 +447,11 @@ public class StateContext { * @param cmd - {@link SCMCommand}. */ public void addCmdStatus(SCMCommand cmd) { + if (cmd.getType().equals(Type.closeContainerCommand)) { + // We will be removing CommandStatus completely. + // As a first step, removed it for CloseContainerCommand. + return; + } CommandStatusBuilder statusBuilder; if (cmd.getType() == Type.deleteBlocksCommand) { statusBuilder = new DeleteBlockCommandStatusBuilder(); @@ -469,14 +474,6 @@ public class StateContext { } /** - * Remove object from cache in StateContext#cmdStatusMap. - * - */ - public void removeCommandStatus(Long cmdId) { - cmdStatusMap.remove(cmdId); - } - - /** * Updates status of a pending status command. * @param cmdId command id * @param cmdStatusUpdater Consumer to update command status. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 2c3db61..591fe56 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; @@ -63,62 +65,55 @@ public class CloseContainerCommandHandler implements CommandHandler { StateContext context, SCMConnectionManager connectionManager) { LOG.debug("Processing Close Container command."); invocationCount++; - cmdExecuted = false; long startTime = Time.monotonicNow(); // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA) long containerID = -1; try { - - CloseContainerCommandProto - closeContainerProto = - CloseContainerCommandProto - .parseFrom(command.getProtoBufMessage()); + CloseContainerCommandProto closeContainerProto = + CloseContainerCommandProto.parseFrom(command.getProtoBufMessage()); containerID = closeContainerProto.getContainerID(); - if (container.getContainerSet().getContainer(containerID) + // CloseContainer operation is idempotent, if the container is already + // closed, then do nothing. + if (!container.getContainerSet().getContainer(containerID) .getContainerData().isClosed()) { - LOG.debug("Container {} is already closed", containerID); - // It might happen that the where the first attempt of closing the - // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will - // retry to check the container got really closed via Ratis. - // In such cases of the retry attempt, if the container is already - // closed via Ratis, we should just return. - cmdExecuted = true; - return; - } - HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID(); - HddsProtos.ReplicationType replicationType = - closeContainerProto.getReplicationType(); + LOG.debug("Closing container {}.", containerID); + HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID(); + HddsProtos.ReplicationType replicationType = + closeContainerProto.getReplicationType(); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CloseContainer); - request.setContainerID(containerID); - request.setCloseContainer( - ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid( - context.getParent().getDatanodeDetails().getUuidString()); - // submit the close container request for the XceiverServer to handle - container.submitContainerRequest( - request.build(), replicationType, pipelineID); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid( + context.getParent().getDatanodeDetails().getUuidString()); + // submit the close container request for the XceiverServer to handle + container.submitContainerRequest( + request.build(), replicationType, pipelineID); + // Since the container is closed, we trigger an ICR + IncrementalContainerReportProto icr = IncrementalContainerReportProto + .newBuilder() + .addReport(container.getContainerSet() + .getContainer(containerID).getContainerReport()) + .build(); + context.addReport(icr); + context.getParent().triggerHeartbeat(); + } } catch (Exception e) { if (e instanceof NotLeaderException) { // If the particular datanode is not the Ratis leader, the close // container command will not be executed by the follower but will be // executed by Ratis stateMachine transactions via leader to follower. // There can also be case where the datanode is in candidate state. - // In these situations, NotLeaderException is thrown. Remove the status - // from cmdStatus Map here so that it will be retried only by SCM if the - // leader could not not close the container after a certain time. - context.removeCommandStatus(containerID); - LOG.info(e.getLocalizedMessage()); + // In these situations, NotLeaderException is thrown. + LOG.info("Follower cannot close the container {}.", containerID); } else { LOG.error("Can't close container " + containerID, e); - cmdExecuted = false; } } finally { - updateCommandStatus(context, command, - (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 4fd72ec..0c0f1af 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -141,7 +141,9 @@ public class HeartbeatEndpointTask rpcEndpoint.zeroMissedCount(); } catch (IOException ex) { // put back the reports which failed to be sent - putBackReports(requestBuilder); + if (requestBuilder != null) { + putBackReports(requestBuilder); + } rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); @@ -159,10 +161,10 @@ public class HeartbeatEndpointTask reports.add(requestBuilder.getNodeReport()); } if (requestBuilder.getCommandStatusReportsCount() != 0) { - for (GeneratedMessage msg : requestBuilder - .getCommandStatusReportsList()) { - reports.add(msg); - } + reports.addAll(requestBuilder.getCommandStatusReportsList()); + } + if (requestBuilder.getIncrementalContainerReportCount() != 0) { + reports.addAll(requestBuilder.getIncrementalContainerReportList()); } context.putBackReports(reports); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 4ddb7b2..33ea307 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; optional NodeReportProto nodeReport = 2; optional ContainerReportsProto containerReport = 3; - optional IncrementalContainerReportProto incrementalContainerReport = 4; + repeated IncrementalContainerReportProto incrementalContainerReport = 4; repeated CommandStatusReportsProto commandStatusReports = 5; optional ContainerActionsProto containerActions = 6; optional PipelineActionsProto pipelineActions = 7; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index c0de382..53dfc5a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -58,10 +58,6 @@ public class CommandStatusReportHandler implements publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new ReplicationStatus(cmdStatus)); break; - case closeContainerCommand: - publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new - CloseContainerStatus(cmdStatus)); - break; case deleteBlocksCommand: if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 69574a9..9796a96 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -22,14 +22,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ; /** * In case of a node failure, volume failure, volume out of spapce, node @@ -128,32 +126,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { new CommandForDatanode<>(node.getUuid(), closeContainerCommand)) .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command)); - publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, - new CloseContainerRetryableReq(containerID)); - LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand, pipeline, containerID); } - - /** - * Class to create retryable event. Prevents redundant requests for same - * container Id. - */ - public static class CloseContainerRetryableReq implements - IdentifiableEventPayload { - - private ContainerID containerID; - public CloseContainerRetryableReq(ContainerID containerID) { - this.containerID = containerID; - } - - public ContainerID getContainerID() { - return containerID; - } - - @Override - public long getId() { - return containerID.getId(); - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java deleted file mode 100644 index 4593c1f..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CloseContainerStatus; - -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.server.events.Event; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.server.events.EventWatcher; -import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler - .CloseContainerRetryableReq; -import org.apache.hadoop.ozone.lease.LeaseManager; -import org.apache.hadoop.ozone.lease.LeaseNotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from - * CommandStatusReport. If required it will re-trigger CloseContainer command - * for DataNodes to CloseContainerEventHandler. - */ -public class CloseContainerWatcher extends - EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> { - - public static final Logger LOG = - LoggerFactory.getLogger(CloseContainerWatcher.class); - private final ContainerManager containerManager; - - public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent, - Event<CloseContainerStatus> completionEvent, - LeaseManager<Long> leaseManager, ContainerManager containerManager) { - super(startEvent, completionEvent, leaseManager); - this.containerManager = containerManager; - } - - @Override - protected void onTimeout(EventPublisher publisher, - CloseContainerRetryableReq payload) { - // Let CloseContainerEventHandler handle this message. - this.resendEventToHandler(payload.getId(), publisher); - } - - @Override - protected void onFinished(EventPublisher publisher, - CloseContainerRetryableReq payload) { - LOG.trace("CloseContainerCommand for containerId: {} executed ", payload - .getContainerID().getId()); - } - - @Override - protected synchronized void handleCompletion(CloseContainerStatus status, - EventPublisher publisher) throws LeaseNotFoundException { - // If status is PENDING then return without doing anything. - if(status.getCmdStatus().getStatus().equals(Status.PENDING)){ - return; - } - - CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId()); - super.handleCompletion(status, publisher); - // If status is FAILED then send a msg to Handler to resend the command. - if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont - != null) { - this.resendEventToHandler(closeCont.getId(), publisher); - } - } - - private void resendEventToHandler(long containerID, EventPublisher - publisher) { - try { - // Check if container is still open - if (containerManager.getContainer( - ContainerID.valueof(containerID)).isOpen()) { - publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, - ContainerID.valueof(containerID)); - } - } catch (IOException e) { - LOG.warn("Error in CloseContainerWatcher while processing event " + - "for containerId {} ExceptionMsg: ", containerID, e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 0a48915..0906ca8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -16,14 +16,11 @@ */ package org.apache.hadoop.hdds.scm.container; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import java.io.Closeable; @@ -123,14 +120,6 @@ public interface ContainerManager extends Closeable { HddsProtos.LifeCycleEvent event) throws IOException; /** - * Process container report from Datanode. - * - * @param reports Container report - */ - void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports) throws IOException; - - /** * Returns the latest list of replicas for given containerId. * * @param containerID Container ID http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java index f2e80f4..9445fe8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@ -22,6 +22,8 @@ import org.apache.commons.lang3.builder.CompareToBuilder; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import java.util.Optional; import java.util.UUID; @@ -32,15 +34,18 @@ import java.util.UUID; public final class ContainerReplica implements Comparable<ContainerReplica> { final private ContainerID containerID; + final private ContainerReplicaProto.State state; final private DatanodeDetails datanodeDetails; final private UUID placeOfBirth; private Long sequenceId; - private ContainerReplica(ContainerID containerID, DatanodeDetails datanode, - UUID originNodeId) { + private ContainerReplica(final ContainerID containerID, + final ContainerReplicaProto.State state, final DatanodeDetails datanode, + final UUID originNodeId) { this.containerID = containerID; + this.state = state; this.datanodeDetails = datanode; this.placeOfBirth = originNodeId; } @@ -68,6 +73,15 @@ public final class ContainerReplica implements Comparable<ContainerReplica> { } /** + * Returns the state of this replica. + * + * @return replica state + */ + public ContainerReplicaProto.State getState() { + return state; + } + + /** * Returns the Sequence Id of this replica. * * @return Sequence Id @@ -126,6 +140,7 @@ public final class ContainerReplica implements Comparable<ContainerReplica> { public static class ContainerReplicaBuilder { private ContainerID containerID; + private ContainerReplicaProto.State state; private DatanodeDetails datanode; private UUID placeOfBirth; private Long sequenceId; @@ -142,6 +157,12 @@ public final class ContainerReplica implements Comparable<ContainerReplica> { return this; } + public ContainerReplicaBuilder setContainerState( + final ContainerReplicaProto.State containerState) { + state = containerState; + return this; + } + /** * Set DatanodeDetails. * @@ -184,9 +205,12 @@ public final class ContainerReplica implements Comparable<ContainerReplica> { public ContainerReplica build() { Preconditions.checkNotNull(containerID, "Container Id can't be null"); + Preconditions.checkNotNull(state, + "Container state can't be null"); Preconditions.checkNotNull(datanode, "DatanodeDetails can't be null"); - ContainerReplica replica = new ContainerReplica(containerID, datanode, + ContainerReplica replica = new ContainerReplica( + containerID, state, datanode, Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid())); Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId); return replica; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 0cb2f81..c9fe9e4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.hdds.scm.container; -import java.io.IOException; +import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.states.ReportResult; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -47,73 +54,160 @@ public class ContainerReportHandler implements LoggerFactory.getLogger(ContainerReportHandler.class); private final NodeManager nodeManager; + private final PipelineManager pipelineManager; private final ContainerManager containerManager; - private ReplicationActivityStatus replicationStatus; + private final ReplicationActivityStatus replicationStatus; - public ContainerReportHandler(ContainerManager containerManager, - NodeManager nodeManager, - ReplicationActivityStatus replicationActivityStatus) { - Preconditions.checkNotNull(containerManager); + public ContainerReportHandler(final NodeManager nodeManager, + final PipelineManager pipelineManager, + final ContainerManager containerManager, + final ReplicationActivityStatus replicationActivityStatus) { Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(pipelineManager); + Preconditions.checkNotNull(containerManager); Preconditions.checkNotNull(replicationActivityStatus); this.nodeManager = nodeManager; + this.pipelineManager = pipelineManager; this.containerManager = containerManager; this.replicationStatus = replicationActivityStatus; } @Override - public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, - EventPublisher publisher) { + public void onMessage(final ContainerReportFromDatanode reportFromDatanode, + final EventPublisher publisher) { + + final DatanodeDetails datanodeDetails = + reportFromDatanode.getDatanodeDetails(); - DatanodeDetails datanodeOrigin = - containerReportFromDatanode.getDatanodeDetails(); + final ContainerReportsProto containerReport = + reportFromDatanode.getReport(); - ContainerReportsProto containerReport = - containerReportFromDatanode.getReport(); try { - //update state in container db and trigger close container events - containerManager - .processContainerReports(datanodeOrigin, containerReport); - - Set<ContainerID> containerIds = containerReport.getReportsList().stream() - .map(StorageContainerDatanodeProtocolProtos - .ContainerReplicaProto::getContainerID) - .map(ContainerID::new) - .collect(Collectors.toSet()); - - ReportResult<ContainerID> reportResult = nodeManager - .processContainerReport(datanodeOrigin.getUuid(), containerIds); - - //we have the report, so we can update the states for the next iteration. - nodeManager - .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); - - for (ContainerID containerID : reportResult.getMissingEntries()) { - final ContainerReplica replica = ContainerReplica.newBuilder() - .setContainerID(containerID) - .setDatanodeDetails(datanodeOrigin) - .build(); - containerManager - .removeContainerReplica(containerID, replica); - checkReplicationState(containerID, publisher); + final List<ContainerReplicaProto> replicas = containerReport + .getReportsList(); + + // ContainerIDs which SCM expects this datanode to have. + final Set<ContainerID> expectedContainerIDs = nodeManager + .getContainers(datanodeDetails); + + // ContainerIDs that this datanode actually has. + final Set<ContainerID> actualContainerIDs = replicas.parallelStream() + .map(ContainerReplicaProto::getContainerID) + .map(ContainerID::valueof).collect(Collectors.toSet()); + + // Container replicas which SCM is not aware of. + final Set<ContainerID> newReplicas = + new HashSet<>(actualContainerIDs); + newReplicas.removeAll(expectedContainerIDs); + + // Container replicas which are missing from datanode. + final Set<ContainerID> missingReplicas = + new HashSet<>(expectedContainerIDs); + missingReplicas.removeAll(actualContainerIDs); + + processContainerReplicas(datanodeDetails, replicas, publisher); + + // Remove missing replica from ContainerManager + for (ContainerID id : missingReplicas) { + try { + containerManager.getContainerReplicas(id) + .stream() + .filter(replica -> + replica.getDatanodeDetails().equals(datanodeDetails)) + .findFirst() + .ifPresent(replica -> { + try { + containerManager.removeContainerReplica(id, replica); + } catch (ContainerNotFoundException | + ContainerReplicaNotFoundException e) { + // This should not happen, but even if it happens, not an + // issue + } + }); + } catch (ContainerNotFoundException e) { + LOG.warn("Cannot remove container replica, container {} not found", + id); + } } - for (ContainerID containerID : reportResult.getNewEntries()) { - final ContainerReplica replica = ContainerReplica.newBuilder() - .setContainerID(containerID) - .setDatanodeDetails(datanodeOrigin) - .build(); - containerManager.updateContainerReplica(containerID, replica); - checkReplicationState(containerID, publisher); - } + // Update the latest set of containers for this datanode in NodeManager. + nodeManager.setContainers(datanodeDetails, actualContainerIDs); + + // Replicate if needed. + newReplicas.forEach(id -> checkReplicationState(id, publisher)); + missingReplicas.forEach(id -> checkReplicationState(id, publisher)); + + } catch (NodeNotFoundException ex) { + LOG.error("Received container report from unknown datanode {}", + datanodeDetails); + } + + } - } catch (IOException e) { - //TODO: stop all the replication? - LOG.error("Error on processing container report from datanode {}", - datanodeOrigin, e); + private void processContainerReplicas(final DatanodeDetails datanodeDetails, + final List<ContainerReplicaProto> replicas, + final EventPublisher publisher) { + final PendingDeleteStatusList pendingDeleteStatusList = + new PendingDeleteStatusList(datanodeDetails); + for (ContainerReplicaProto replicaProto : replicas) { + try { + final ContainerID containerID = ContainerID.valueof( + replicaProto.getContainerID()); + final ContainerInfo containerInfo = containerManager + .getContainer(containerID); + updateContainerState(datanodeDetails, containerInfo, + replicaProto, publisher); + if (containerInfo.getDeleteTransactionId() > + replicaProto.getDeleteTransactionId()) { + pendingDeleteStatusList + .addPendingDeleteStatus(replicaProto.getDeleteTransactionId(), + containerInfo.getDeleteTransactionId(), + containerInfo.getContainerID()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Received container report for an unknown container {}", + replicaProto.getContainerID()); + } } + if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { + publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, + pendingDeleteStatusList); + } + } + private void updateContainerState(final DatanodeDetails datanodeDetails, + final ContainerInfo containerInfo, + final ContainerReplicaProto replicaProto, + final EventPublisher publisher) + throws ContainerNotFoundException { + + final ContainerID id = containerInfo.containerID(); + final ContainerReplica datanodeContainerReplica = ContainerReplica + .newBuilder() + .setContainerID(id) + .setContainerState(replicaProto.getState()) + .setDatanodeDetails(datanodeDetails) + .build(); + // TODO: Add bcsid and origin datanode to replica. + + final ContainerReplica scmContainerReplica = containerManager + .getContainerReplicas(id) + .stream() + .filter(replica -> + replica.getDatanodeDetails().equals(datanodeDetails)) + .findFirst().orElse(null); + + // This is an in-memory update. + containerManager.updateContainerReplica(id, datanodeContainerReplica); + containerInfo.setUsedBytes(replicaProto.getUsed()); + containerInfo.setNumberOfKeys(replicaProto.getKeyCount()); + + // Check if there is state change in container replica. + if (scmContainerReplica == null || + scmContainerReplica.getState() != datanodeContainerReplica.getState()) { + //TODO: Handler replica state change. + } } private void checkReplicationState(ContainerID containerID, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java new file mode 100644 index 0000000..e07ee73 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Handles incremental container reports from datanode. + */ +public class IncrementalContainerReportHandler implements + EventHandler<IncrementalContainerReportFromDatanode> { + + private static final Logger LOG = + LoggerFactory.getLogger(IncrementalContainerReportHandler.class); + + private final PipelineManager pipelineManager; + private final ContainerManager containerManager; + + public IncrementalContainerReportHandler( + final PipelineManager pipelineManager, + final ContainerManager containerManager) { + Preconditions.checkNotNull(pipelineManager); + Preconditions.checkNotNull(containerManager); + this.pipelineManager = pipelineManager; + this.containerManager = containerManager; + } + + @Override + public void onMessage( + final IncrementalContainerReportFromDatanode containerReportFromDatanode, + final EventPublisher publisher) { + + for (ContainerReplicaProto replicaProto : + containerReportFromDatanode.getReport().getReportList()) { + try { + final DatanodeDetails datanodeDetails = containerReportFromDatanode + .getDatanodeDetails(); + final ContainerID containerID = ContainerID + .valueof(replicaProto.getContainerID()); + final ContainerInfo containerInfo = containerManager + .getContainer(containerID); + + ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(ContainerID.valueof(replicaProto.getContainerID())) + .setContainerState(replicaProto.getState()) + .setDatanodeDetails(datanodeDetails) + .build(); + + containerManager.updateContainerReplica(containerID, replica); + + // Check if the state of the container is changed. + if (replicaProto.getState() == ContainerReplicaProto.State.CLOSED && + containerInfo.getState() == HddsProtos.LifeCycleState.CLOSING) { + containerManager.updateContainerState(containerID, + HddsProtos.LifeCycleEvent.CLOSE); + } + + // TODO: Handler replica state change + + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found!", replicaProto.getContainerID()); + } catch (IOException e) { + LOG.error("Exception while processing ICR for container {}", + replicaProto.getContainerID()); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 0ea749f..86f1f9c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -23,13 +23,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -37,10 +34,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; @@ -489,106 +482,6 @@ public class SCMContainerManager implements ContainerManager { } /** - * Process container report from Datanode. - * <p> - * Processing follows a very simple logic for time being. - * <p> - * 1. Datanodes report the current State -- denoted by the datanodeState - * <p> - * 2. We are the older SCM state from the Database -- denoted by - * the knownState. - * <p> - * 3. We copy the usage etc. from currentState to newState and log that - * newState to the DB. This allows us SCM to bootup again and read the - * state of the world from the DB, and then reconcile the state from - * container reports, when they arrive. - * - * @param reports Container report - */ - @Override - public void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports) throws IOException { - List<ContainerReplicaProto> - containerInfos = reports.getReportsList(); - PendingDeleteStatusList pendingDeleteStatusList = - new PendingDeleteStatusList(datanodeDetails); - for (ContainerReplicaProto newInfo : - containerInfos) { - ContainerID id = ContainerID.valueof(newInfo.getContainerID()); - ContainerReplica replica = ContainerReplica.newBuilder() - .setContainerID(id) - .setDatanodeDetails(datanodeDetails) - .setOriginNodeId(datanodeDetails.getUuid()) - .build(); - lock.lock(); - try { - containerStateManager.updateContainerReplica(id, replica); - ContainerInfo currentInfo = containerStateManager.getContainer(id); - if (newInfo.getState() == ContainerReplicaProto.State.CLOSED - && currentInfo.getState() == LifeCycleState.CLOSING) { - currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE); - if (!currentInfo.isOpen()) { - pipelineManager.removeContainerFromPipeline( - currentInfo.getPipelineID(), id); - } - } - - ContainerInfoProto newState = - reconcileState(newInfo, currentInfo); - - if (currentInfo.getDeleteTransactionId() > - newInfo.getDeleteTransactionId()) { - pendingDeleteStatusList - .addPendingDeleteStatus(newInfo.getDeleteTransactionId(), - currentInfo.getDeleteTransactionId(), - currentInfo.getContainerID()); - } - containerStateManager.updateContainerInfo( - ContainerInfo.fromProtobuf(newState)); - containerStore.put(id.getBytes(), newState.toByteArray()); - } catch (ContainerNotFoundException e) { - LOG.error("Error while processing container report from datanode :" + - " {}, for container: {}, reason: container doesn't exist in" + - "container database.", datanodeDetails, id); - } finally { - lock.unlock(); - } - } - if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { - eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, - pendingDeleteStatusList); - } - - } - - /** - * Reconciles the state from Datanode with the state in SCM. - * - * @param datanodeState - State from the Datanode. - * @param knownState - State inside SCM. - * @return new SCM State for this container. - */ - private HddsProtos.ContainerInfoProto reconcileState( - ContainerReplicaProto datanodeState, - ContainerInfo knownState) { - HddsProtos.ContainerInfoProto.Builder builder = - HddsProtos.ContainerInfoProto.newBuilder(); - builder.setContainerID(knownState.getContainerID()) - .setPipelineID(knownState.getPipelineID().getProtobuf()) - .setState(knownState.getState()) - .setReplicationType(knownState.getReplicationType()) - .setReplicationFactor(knownState.getReplicationFactor()) - .setUsedBytes(datanodeState.getUsed()) - .setNumberOfKeys(datanodeState.getKeyCount()) - .setStateEnterTime(knownState.getStateEnterTime()) - .setDeleteTransactionId(knownState.getDeleteTransactionId()); - if (knownState.getOwner() != null) { - builder.setOwner(knownState.getOwner()); - } - return builder.build(); - } - - /** * Returns the latest list of DataNodes where replica for given containerId * exist. Throws an SCMException if no entry is found for given containerId. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index b8052a4..0c738b2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.container.states; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -263,12 +262,6 @@ public class ContainerStateMap { } } - @VisibleForTesting - // TODO: fix the test case and remove this method! - public static Logger getLOG() { - return LOG; - } - /** * Just update the container State. * @param info ContainerInfo. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 30a7c34..72d416b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -21,15 +21,13 @@ package org.apache.hadoop.hdds.scm.events; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CloseContainerStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; -import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler - .CloseContainerRetryableReq; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .IncrementalContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .PipelineActionsFromDatanode; @@ -79,6 +77,16 @@ public final class SCMEvents { new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); /** + * IncrementalContainerReports are send out by Datanodes. + * This report is received by SCMDatanodeHeartbeatDispatcher and + * Incremental_Container_Report Event is generated. + */ + public static final TypedEvent<IncrementalContainerReportFromDatanode> + INCREMENTAL_CONTAINER_REPORT = new TypedEvent<>( + IncrementalContainerReportFromDatanode.class, + "Incremental_Container_Report"); + + /** * ContainerActions are sent by Datanode. This event is received by * SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated. */ @@ -138,16 +146,6 @@ public final class SCMEvents { new TypedEvent<>(ContainerID.class, "Close_Container"); /** - * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by - * CloseContainerEventHandler after sending a SCMCommand to DataNode. - * CloseContainerWatcher will track this event. Watcher will be responsible - * for retrying it in event of failure or timeout. - */ - public static final TypedEvent<CloseContainerRetryableReq> - CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>( - CloseContainerRetryableReq.class, "Close_Container_Retryable"); - - /** * This event will be triggered whenever a new datanode is registered with * SCM. */ @@ -176,14 +174,6 @@ public final class SCMEvents { TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status"); /** * This event will be triggered by CommandStatusReportHandler whenever a - * status for CloseContainer SCMCommand is received. - */ - public static final Event<CloseContainerStatus> - CLOSE_CONTAINER_STATUS = - new TypedEvent<>(CloseContainerStatus.class, - "Close_Container_Command_Status"); - /** - * This event will be triggered by CommandStatusReportHandler whenever a * status for DeleteBlock SCMCommand is received. */ public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus> http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 1030428..43f0167 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -61,7 +62,14 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { // TODO: check if there are any pipeline on this node and fire close // pipeline event Set<ContainerID> ids = - nodeManager.getContainers(datanodeDetails.getUuid()); + null; + try { + ids = nodeManager.getContainers(datanodeDetails); + } catch (NodeNotFoundException e) { + // This should not happen, we cannot get a dead node event for an + // unregistered node! + LOG.error("DeadNode event for a unregistered node: {}!", datanodeDetails); + } if (ids == null) { LOG.info("There's no containers in dead datanode {}, no replica will be" + " removed from the in-memory state.", datanodeDetails.getUuid()); @@ -72,18 +80,23 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { for (ContainerID id : ids) { try { final ContainerInfo container = containerManager.getContainer(id); + // TODO: For open containers, trigger close on other nodes + // TODO: Check replica count and call replication manager + // on these containers. if (!container.isOpen()) { - final ContainerReplica replica = ContainerReplica.newBuilder() - .setContainerID(id) - .setDatanodeDetails(datanodeDetails) - .build(); - try { - containerManager.removeContainerReplica(id, replica); - replicateIfNeeded(container, publisher); - } catch (ContainerException ex) { - LOG.warn("Exception while removing container replica #{} for " + - "container #{}.", replica, container, ex); - } + Set<ContainerReplica> replicas = containerManager + .getContainerReplicas(id); + replicas.stream() + .filter(r -> r.getDatanodeDetails().equals(datanodeDetails)) + .findFirst() + .ifPresent(replica -> { + try { + containerManager.removeContainerReplica(id, replica); + } catch (ContainerException ex) { + LOG.warn("Exception while removing container replica #{} " + + "for container #{}.", replica, container, ex); + } + }); } } catch (ContainerNotFoundException cnfe) { LOG.warn("Container Not found!", cnfe); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java index 780aa2b..ed4fdba 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -19,31 +19,17 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import java.util.Collections; - /** * Handles New Node event. */ public class NewNodeHandler implements EventHandler<DatanodeDetails> { - private final NodeManager nodeManager; - - public NewNodeHandler(NodeManager nodeManager) { - this.nodeManager = nodeManager; - } - @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - try { - nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(), - Collections.emptySet()); - } catch (SCMException e) { - // TODO: log exception message. - } + // We currently have nothing to do when we receive new node event. } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index d55ff98..e944634 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -63,13 +62,6 @@ import java.util.UUID; */ public interface NodeManager extends StorageContainerNodeProtocol, EventHandler<CommandForDatanode>, NodeManagerMXBean, Closeable { - /** - * Removes a data node from the management of this Node Manager. - * - * @param node - DataNode. - * @throws NodeNotFoundException - */ - void removeNode(DatanodeDetails node) throws NodeNotFoundException; /** * Gets all Live Datanodes that is currently communicating with SCM. @@ -102,6 +94,7 @@ public interface NodeManager extends StorageContainerNodeProtocol, * Return a map of node stats. * @return a map of individual node stats (live/stale but not dead). */ + // TODO: try to change the return type to Map<DatanodeDetails, SCMNodeStat> Map<UUID, SCMNodeStat> getNodeStats(); /** @@ -121,10 +114,10 @@ public interface NodeManager extends StorageContainerNodeProtocol, /** * Get set of pipelines a datanode is part of. - * @param dnId - datanodeID + * @param datanodeDetails DatanodeDetails * @return Set of PipelineID */ - Set<PipelineID> getPipelineByDnID(UUID dnId); + Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails); /** * Add pipeline information in the NodeManager. @@ -139,40 +132,22 @@ public interface NodeManager extends StorageContainerNodeProtocol, void removePipeline(Pipeline pipeline); /** - * Update set of containers available on a datanode. - * @param uuid - DatanodeID + * Remaps datanode to containers mapping to the new set of containers. + * @param datanodeDetails - DatanodeDetails * @param containerIds - Set of containerIDs * @throws SCMException - if datanode is not known. For new datanode use * addDatanodeInContainerMap call. */ - void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) - throws SCMException; - - /** - * Process containerReport received from datanode. - * @param uuid - DataonodeID - * @param containerIds - Set of containerIDs - * @return The result after processing containerReport - */ - ReportResult<ContainerID> processContainerReport(UUID uuid, - Set<ContainerID> containerIds); + void setContainers(DatanodeDetails datanodeDetails, + Set<ContainerID> containerIds) throws NodeNotFoundException; /** * Return set of containerIDs available on a datanode. - * @param uuid - DatanodeID - * @return - set of containerIDs - */ - Set<ContainerID> getContainers(UUID uuid); - - /** - * Insert a new datanode with set of containerIDs for containers available - * on it. - * @param uuid - DatanodeID - * @param containerIDs - Set of ContainerIDs - * @throws SCMException - if datanode already exists + * @param datanodeDetails DatanodeDetails + * @return set of containerIDs */ - void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs) - throws SCMException; + Set<ContainerID> getContainers(DatanodeDetails datanodeDetails) + throws NodeNotFoundException; /** * Add a {@link SCMCommand} to the command queue, which are @@ -188,7 +163,7 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @param dnUuid * @param nodeReport */ - void processNodeReport(UUID dnUuid, NodeReportProto nodeReport); + void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport); /** * Process a dead node event in this Node Manager. @@ -202,5 +177,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @param dnID - Datanode uuid. * @return list of commands */ + // TODO: We can give better name to this method! List<SCMCommand> getCommandQueue(UUID dnID); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java index 331bfed..f419764 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java @@ -50,6 +50,6 @@ public class NodeReportHandler implements EventHandler<NodeReportFromDatanode> { + "missing DatanodeDetails."); LOGGER.trace("Processing node report for dn: {}", dn); nodeManager - .processNodeReport(dn.getUuid(), nodeReportFromDatanode.getReport()); + .processNodeReport(dn, nodeReportFromDatanode.getReport()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 588756c..a459519 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.*; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.server.events.Event; @@ -94,11 +93,6 @@ public class NodeStateManager implements Runnable, Closeable { */ private final Node2PipelineMap node2PipelineMap; /** - * Maintains the map from node to ContainerIDs for the containers - * available on the node. - */ - private final Node2ContainerMap node2ContainerMap; - /** * Used for publishing node state change events. */ private final EventPublisher eventPublisher; @@ -131,7 +125,6 @@ public class NodeStateManager implements Runnable, Closeable { public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { this.nodeStateMap = new NodeStateMap(); this.node2PipelineMap = new Node2PipelineMap(); - this.node2ContainerMap = new Node2ContainerMap(); this.eventPublisher = eventPublisher; this.state2EventMap = new HashMap<>(); initialiseState2EventMap(); @@ -431,18 +424,6 @@ public class NodeStateManager implements Runnable, Closeable { } /** - * Removes a node from NodeStateManager. - * - * @param datanodeDetails DatanodeDetails - * - * @throws NodeNotFoundException if the node is not present - */ - public void removeNode(DatanodeDetails datanodeDetails) - throws NodeNotFoundException { - nodeStateMap.removeNode(datanodeDetails.getUuid()); - } - - /** * Returns the current stats of the node. * * @param uuid node id @@ -475,19 +456,6 @@ public class NodeStateManager implements Runnable, Closeable { } /** - * Remove the current stats of the specify node. - * - * @param uuid node id - * - * @return SCMNodeStat the stat removed from the node. - * - * @throws NodeNotFoundException if the node is not present. - */ - public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException { - return nodeStateMap.removeNodeStat(uuid); - } - - /** * Removes a pipeline from the node2PipelineMap. * @param pipeline - Pipeline to be removed */ @@ -498,23 +466,11 @@ public class NodeStateManager implements Runnable, Closeable { * Update set of containers available on a datanode. * @param uuid - DatanodeID * @param containerIds - Set of containerIDs - * @throws SCMException - if datanode is not known. For new datanode use - * addDatanodeInContainerMap call. - */ - public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) - throws SCMException { - node2ContainerMap.setContainersForDatanode(uuid, containerIds); - } - - /** - * Process containerReport received from datanode. - * @param uuid - DataonodeID - * @param containerIds - Set of containerIDs - * @return The result after processing containerReport + * @throws NodeNotFoundException - if datanode is not known. */ - public ReportResult<ContainerID> processContainerReport(UUID uuid, - Set<ContainerID> containerIds) { - return node2ContainerMap.processReport(uuid, containerIds); + public void setContainers(UUID uuid, Set<ContainerID> containerIds) + throws NodeNotFoundException { + nodeStateMap.setContainers(uuid, containerIds); } /** @@ -522,20 +478,9 @@ public class NodeStateManager implements Runnable, Closeable { * @param uuid - DatanodeID * @return - set of containerIDs */ - public Set<ContainerID> getContainers(UUID uuid) { - return node2ContainerMap.getContainers(uuid); - } - - /** - * Insert a new datanode with set of containerIDs for containers available - * on it. - * @param uuid - DatanodeID - * @param containerIDs - Set of ContainerIDs - * @throws SCMException - if datanode already exists - */ - public void addDatanodeInContainerMap(UUID uuid, - Set<ContainerID> containerIDs) throws SCMException { - node2ContainerMap.insertNewDatanode(uuid, containerIDs); + public Set<ContainerID> getContainers(UUID uuid) + throws NodeNotFoundException { + return nodeStateMap.getContainers(uuid); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 35c22f3..c42ef66 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -129,16 +128,6 @@ public class SCMNodeManager } } - /** - * Removes a data node from the management of this Node Manager. - * - * @param node - DataNode. - * @throws NodeNotFoundException - */ - @Override - public void removeNode(DatanodeDetails node) throws NodeNotFoundException { - nodeStateManager.removeNode(node); - } /** * Gets all datanodes that are in a certain state. This function works by @@ -270,10 +259,8 @@ public class SCMNodeManager datanodeDetails.setHostName(dnAddress.getHostName()); datanodeDetails.setIpAddress(dnAddress.getHostAddress()); } - UUID dnId = datanodeDetails.getUuid(); try { nodeStateManager.addNode(datanodeDetails); - nodeStateManager.setNodeStat(dnId, new SCMNodeStat()); // Updating Node Report, as registration is successful updateNodeStat(datanodeDetails.getUuid(), nodeReport); LOG.info("Registered Data node : {}", datanodeDetails); @@ -326,8 +313,9 @@ public class SCMNodeManager * @param nodeReport */ @Override - public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) { - this.updateNodeStat(dnUuid, nodeReport); + public void processNodeReport(DatanodeDetails dnUuid, + NodeReportProto nodeReport) { + this.updateNodeStat(dnUuid.getUuid(), nodeReport); } /** @@ -377,12 +365,12 @@ public class SCMNodeManager /** * Get set of pipelines a datanode is part of. - * @param dnId - datanodeID + * @param datanodeDetails - datanodeID * @return Set of PipelineID */ @Override - public Set<PipelineID> getPipelineByDnID(UUID dnId) { - return nodeStateManager.getPipelineByDnID(dnId); + public Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails) { + return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid()); } @@ -406,50 +394,27 @@ public class SCMNodeManager /** * Update set of containers available on a datanode. - * @param uuid - DatanodeID + * @param datanodeDetails - DatanodeID * @param containerIds - Set of containerIDs * @throws SCMException - if datanode is not known. For new datanode use * addDatanodeInContainerMap call. */ @Override - public void setContainersForDatanode(UUID uuid, - Set<ContainerID> containerIds) throws SCMException { - nodeStateManager.setContainersForDatanode(uuid, containerIds); - } - - /** - * Process containerReport received from datanode. - * @param uuid - DataonodeID - * @param containerIds - Set of containerIDs - * @return The result after processing containerReport - */ - @Override - public ReportResult<ContainerID> processContainerReport(UUID uuid, - Set<ContainerID> containerIds) { - return nodeStateManager.processContainerReport(uuid, containerIds); + public void setContainers(DatanodeDetails datanodeDetails, + Set<ContainerID> containerIds) throws NodeNotFoundException { + nodeStateManager.setContainers(datanodeDetails.getUuid(), + containerIds); } /** * Return set of containerIDs available on a datanode. - * @param uuid - DatanodeID + * @param datanodeDetails - DatanodeID * @return - set of containerIDs */ @Override - public Set<ContainerID> getContainers(UUID uuid) { - return nodeStateManager.getContainers(uuid); - } - - /** - * Insert a new datanode with set of containerIDs for containers available - * on it. - * @param uuid - DatanodeID - * @param containerIDs - Set of ContainerIDs - * @throws SCMException - if datanode already exists - */ - @Override - public void addDatanodeInContainerMap(UUID uuid, - Set<ContainerID> containerIDs) throws SCMException { - nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs); + public Set<ContainerID> getContainers(DatanodeDetails datanodeDetails) + throws NodeNotFoundException { + return nodeStateManager.getContainers(datanodeDetails.getUuid()); } // TODO: @@ -481,6 +446,7 @@ public class SCMNodeManager * @param dnUuid datanode uuid. */ @Override + // TODO: This should be removed. public void processDeadNode(UUID dnUuid) { try { SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index 9df9dff..268fe5b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -49,7 +49,7 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> { public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { Set<PipelineID> pipelineIds = - nodeManager.getPipelineByDnID(datanodeDetails.getUuid()); + nodeManager.getPipelines(datanodeDetails); for (PipelineID pipelineID : pipelineIds) { try { pipelineManager.finalizePipeline(pipelineID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java index 774ced1..a917e79 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.node.states; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; @@ -48,6 +49,10 @@ public class NodeStateMap { * Represents the current stats of node. */ private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats; + /** + * Node to set of containers on the node. + */ + private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer; private final ReadWriteLock lock; @@ -59,6 +64,7 @@ public class NodeStateMap { nodeMap = new ConcurrentHashMap<>(); stateMap = new ConcurrentHashMap<>(); nodeStats = new ConcurrentHashMap<>(); + nodeToContainer = new ConcurrentHashMap<>(); initStateMap(); } @@ -88,6 +94,8 @@ public class NodeStateMap { throw new NodeAlreadyExistsException("Node UUID: " + id); } nodeMap.put(id, new DatanodeInfo(datanodeDetails)); + nodeStats.put(id, new SCMNodeStat()); + nodeToContainer.put(id, Collections.emptySet()); stateMap.get(nodeState).add(id); } finally { lock.writeLock().unlock(); @@ -238,30 +246,6 @@ public class NodeStateMap { } /** - * Removes the node from NodeStateMap. - * - * @param uuid node id - * - * @throws NodeNotFoundException if the node is not found - */ - public void removeNode(UUID uuid) throws NodeNotFoundException { - lock.writeLock().lock(); - try { - if (nodeMap.containsKey(uuid)) { - for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) { - if(entry.getValue().remove(uuid)) { - break; - } - nodeMap.remove(uuid); - } - throw new NodeNotFoundException("Node UUID: " + uuid); - } - } finally { - lock.writeLock().unlock(); - } - } - - /** * Returns the current stats of the node. * * @param uuid node id @@ -298,21 +282,30 @@ public class NodeStateMap { nodeStats.put(uuid, newstat); } - /** - * Remove the current stats of the specify node. - * - * @param uuid node id - * - * @return SCMNodeStat the stat removed from the node. - * - * @throws NodeNotFoundException if the node is not found - */ - public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException { - SCMNodeStat stat = nodeStats.remove(uuid); - if (stat == null) { + public void setContainers(UUID uuid, Set<ContainerID> containers) + throws NodeNotFoundException{ + if (!nodeToContainer.containsKey(uuid)) { throw new NodeNotFoundException("Node UUID: " + uuid); } - return stat; + nodeToContainer.put(uuid, containers); + } + + public Set<ContainerID> getContainers(UUID uuid) + throws NodeNotFoundException { + Set<ContainerID> containers = nodeToContainer.get(uuid); + if (containers == null) { + throw new NodeNotFoundException("Node UUID: " + uuid); + } + return Collections.unmodifiableSet(containers); + } + + public void removeContainer(UUID uuid, ContainerID containerID) throws + NodeNotFoundException { + Set<ContainerID> containers = nodeToContainer.get(uuid); + if (containers == null) { + throw new NodeNotFoundException("Node UUID: " + uuid); + } + containers.remove(containerID); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 7c7df27..ede8b4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hdds.scm.server; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; + .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; import org.apache.hadoop.hdds.protocol.proto @@ -210,6 +212,19 @@ public final class SCMDatanodeHeartbeatDispatcher { } /** + * Incremental Container report event payload with origin. + */ + public static class IncrementalContainerReportFromDatanode + extends ReportFromDatanode<IncrementalContainerReportProto> { + + public IncrementalContainerReportFromDatanode( + DatanodeDetails datanodeDetails, + IncrementalContainerReportProto report) { + super(datanodeDetails, report); + } + } + + /** * Container action event payload with origin. */ public static class ContainerActionsFromDatanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 37c7386..0beceab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -97,6 +97,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRES import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT; import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; @@ -196,8 +197,9 @@ public class SCMDatanodeProtocolServer implements .register(datanodeDetails, nodeReport, pipelineReportsProto); if (registeredCommand.getError() == SCMRegisteredResponseProto.ErrorCode.success) { - scm.getContainerManager().processContainerReports(datanodeDetails, - containerReportsProto); + eventPublisher.fireEvent(CONTAINER_REPORT, + new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode( + datanodeDetails, containerReportsProto)); eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, new NodeRegistrationContainerReport(datanodeDetails, containerReportsProto)); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org