http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java deleted file mode 100644 index 12c196b..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ /dev/null @@ -1,504 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import com.google.common.base.Preconditions; -import com.google.protobuf.GeneratedMessage; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineAction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .InitDatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .RunningDatanodeState; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus - .CommandStatusBuilder; -import org.apache.hadoop.ozone.protocol.commands - .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -import static java.lang.Math.min; -import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; - -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; - -/** - * Current Context of State Machine. - */ -public class StateContext { - static final Logger LOG = - LoggerFactory.getLogger(StateContext.class); - private final Queue<SCMCommand> commandQueue; - private final Map<Long, CommandStatus> cmdStatusMap; - private final Lock lock; - private final DatanodeStateMachine parent; - private final AtomicLong stateExecutionCount; - private final Configuration conf; - private final List<GeneratedMessage> reports; - private final Queue<ContainerAction> containerActions; - private final Queue<PipelineAction> pipelineActions; - private DatanodeStateMachine.DatanodeStates state; - - /** - * Starting with a 2 sec heartbeat frequency which will be updated to the - * real HB frequency after scm registration. With this method the - * initial registration could be significant faster. - */ - private AtomicLong heartbeatFrequency = new AtomicLong(2000); - - /** - * Constructs a StateContext. - * - * @param conf - Configration - * @param state - State - * @param parent Parent State Machine - */ - public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates - state, DatanodeStateMachine parent) { - this.conf = conf; - this.state = state; - this.parent = parent; - commandQueue = new LinkedList<>(); - cmdStatusMap = new ConcurrentHashMap<>(); - reports = new LinkedList<>(); - containerActions = new LinkedList<>(); - pipelineActions = new LinkedList<>(); - lock = new ReentrantLock(); - stateExecutionCount = new AtomicLong(0); - } - - /** - * Returns the ContainerStateMachine class that holds this state. - * - * @return ContainerStateMachine. - */ - public DatanodeStateMachine getParent() { - return parent; - } - - /** - * Get the container server port. - * @return The container server port if available, return -1 if otherwise - */ - public int getContainerPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getContainerServerPort(); - } - - /** - * Gets the Ratis Port. - * @return int , return -1 if not valid. - */ - public int getRatisPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); - } - - /** - * Returns true if we are entering a new state. - * - * @return boolean - */ - boolean isEntering() { - return stateExecutionCount.get() == 0; - } - - /** - * Returns true if we are exiting from the current state. - * - * @param newState - newState. - * @return boolean - */ - boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { - boolean isExiting = state != newState && stateExecutionCount.get() > 0; - if(isExiting) { - stateExecutionCount.set(0); - } - return isExiting; - } - - /** - * Returns the current state the machine is in. - * - * @return state. - */ - public DatanodeStateMachine.DatanodeStates getState() { - return state; - } - - /** - * Sets the current state of the machine. - * - * @param state state. - */ - public void setState(DatanodeStateMachine.DatanodeStates state) { - this.state = state; - } - - /** - * Adds the report to report queue. - * - * @param report report to be added - */ - public void addReport(GeneratedMessage report) { - if (report != null) { - synchronized (reports) { - reports.add(report); - } - } - } - - /** - * Adds the reports which could not be sent by heartbeat back to the - * reports list. - * - * @param reportsToPutBack list of reports which failed to be sent by - * heartbeat. - */ - public void putBackReports(List<GeneratedMessage> reportsToPutBack) { - synchronized (reports) { - reports.addAll(0, reportsToPutBack); - } - } - - /** - * Returns all the available reports from the report queue, or empty list if - * the queue is empty. - * - * @return List<reports> - */ - public List<GeneratedMessage> getAllAvailableReports() { - return getReports(Integer.MAX_VALUE); - } - - /** - * Returns available reports from the report queue with a max limit on - * list size, or empty list if the queue is empty. - * - * @return List<reports> - */ - public List<GeneratedMessage> getReports(int maxLimit) { - List<GeneratedMessage> reportsToReturn = new LinkedList<>(); - synchronized (reports) { - List<GeneratedMessage> tempList = reports.subList( - 0, min(reports.size(), maxLimit)); - reportsToReturn.addAll(tempList); - tempList.clear(); - } - return reportsToReturn; - } - - - /** - * Adds the ContainerAction to ContainerAction queue. - * - * @param containerAction ContainerAction to be added - */ - public void addContainerAction(ContainerAction containerAction) { - synchronized (containerActions) { - containerActions.add(containerAction); - } - } - - /** - * Add ContainerAction to ContainerAction queue if it's not present. - * - * @param containerAction ContainerAction to be added - */ - public void addContainerActionIfAbsent(ContainerAction containerAction) { - synchronized (containerActions) { - if (!containerActions.contains(containerAction)) { - containerActions.add(containerAction); - } - } - } - - /** - * Returns all the pending ContainerActions from the ContainerAction queue, - * or empty list if the queue is empty. - * - * @return List<ContainerAction> - */ - public List<ContainerAction> getAllPendingContainerActions() { - return getPendingContainerAction(Integer.MAX_VALUE); - } - - /** - * Returns pending ContainerActions from the ContainerAction queue with a - * max limit on list size, or empty list if the queue is empty. - * - * @return List<ContainerAction> - */ - public List<ContainerAction> getPendingContainerAction(int maxLimit) { - List<ContainerAction> containerActionList = new ArrayList<>(); - synchronized (containerActions) { - if (!containerActions.isEmpty()) { - int size = containerActions.size(); - int limit = size > maxLimit ? maxLimit : size; - for (int count = 0; count < limit; count++) { - // we need to remove the action from the containerAction queue - // as well - ContainerAction action = containerActions.poll(); - Preconditions.checkNotNull(action); - containerActionList.add(action); - } - } - return containerActionList; - } - } - - /** - * Add PipelineAction to PipelineAction queue if it's not present. - * - * @param pipelineAction PipelineAction to be added - */ - public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { - synchronized (pipelineActions) { - /** - * If pipelineAction queue already contains entry for the pipeline id - * with same action, we should just return. - * Note: We should not use pipelineActions.contains(pipelineAction) here - * as, pipelineAction has a msg string. So even if two msgs differ though - * action remains same on the given pipeline, it will end up adding it - * multiple times here. - */ - for (PipelineAction pipelineActionIter : pipelineActions) { - if (pipelineActionIter.getAction() == pipelineAction.getAction() - && pipelineActionIter.hasClosePipeline() && pipelineAction - .hasClosePipeline() - && pipelineActionIter.getClosePipeline().getPipelineID() - .equals(pipelineAction.getClosePipeline().getPipelineID())) { - return; - } - } - pipelineActions.add(pipelineAction); - } - } - - /** - * Returns pending PipelineActions from the PipelineAction queue with a - * max limit on list size, or empty list if the queue is empty. - * - * @return List<ContainerAction> - */ - public List<PipelineAction> getPendingPipelineAction(int maxLimit) { - List<PipelineAction> pipelineActionList = new ArrayList<>(); - synchronized (pipelineActions) { - if (!pipelineActions.isEmpty()) { - int size = pipelineActions.size(); - int limit = size > maxLimit ? maxLimit : size; - for (int count = 0; count < limit; count++) { - pipelineActionList.add(pipelineActions.poll()); - } - } - return pipelineActionList; - } - } - - /** - * Returns the next task to get executed by the datanode state machine. - * @return A callable that will be executed by the - * {@link DatanodeStateMachine} - */ - @SuppressWarnings("unchecked") - public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() { - switch (this.state) { - case INIT: - return new InitDatanodeState(this.conf, parent.getConnectionManager(), - this); - case RUNNING: - return new RunningDatanodeState(this.conf, parent.getConnectionManager(), - this); - case SHUTDOWN: - return null; - default: - throw new IllegalArgumentException("Not Implemented yet."); - } - } - - /** - * Executes the required state function. - * - * @param service - Executor Service - * @param time - seconds to wait - * @param unit - Seconds. - * @throws InterruptedException - * @throws ExecutionException - * @throws TimeoutException - */ - public void execute(ExecutorService service, long time, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - stateExecutionCount.incrementAndGet(); - DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask(); - if (this.isEntering()) { - task.onEnter(); - } - task.execute(service); - DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); - if (this.state != newState) { - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} executed, state transited from {} to {}", - task.getClass().getSimpleName(), this.state, newState); - } - if (isExiting(newState)) { - task.onExit(); - } - this.setState(newState); - } - } - - /** - * Returns the next command or null if it is empty. - * - * @return SCMCommand or Null. - */ - public SCMCommand getNextCommand() { - lock.lock(); - try { - return commandQueue.poll(); - } finally { - lock.unlock(); - } - } - - /** - * Adds a command to the State Machine queue. - * - * @param command - SCMCommand. - */ - public void addCommand(SCMCommand command) { - lock.lock(); - try { - commandQueue.add(command); - } finally { - lock.unlock(); - } - this.addCmdStatus(command); - } - - /** - * Returns the count of the Execution. - * @return long - */ - public long getExecutionCount() { - return stateExecutionCount.get(); - } - - /** - * Returns the next {@link CommandStatus} or null if it is empty. - * - * @return {@link CommandStatus} or Null. - */ - public CommandStatus getCmdStatus(Long key) { - return cmdStatusMap.get(key); - } - - /** - * Adds a {@link CommandStatus} to the State Machine. - * - * @param status - {@link CommandStatus}. - */ - public void addCmdStatus(Long key, CommandStatus status) { - cmdStatusMap.put(key, status); - } - - /** - * Adds a {@link CommandStatus} to the State Machine for given SCMCommand. - * - * @param cmd - {@link SCMCommand}. - */ - public void addCmdStatus(SCMCommand cmd) { - CommandStatusBuilder statusBuilder; - if (cmd.getType() == Type.deleteBlocksCommand) { - statusBuilder = new DeleteBlockCommandStatusBuilder(); - } else { - statusBuilder = CommandStatusBuilder.newBuilder(); - } - this.addCmdStatus(cmd.getId(), - statusBuilder.setCmdId(cmd.getId()) - .setStatus(Status.PENDING) - .setType(cmd.getType()) - .build()); - } - - /** - * Get map holding all {@link CommandStatus} objects. - * - */ - public Map<Long, CommandStatus> getCommandStatusMap() { - return cmdStatusMap; - } - - /** - * Remove object from cache in StateContext#cmdStatusMap. - * - */ - public void removeCommandStatus(Long cmdId) { - cmdStatusMap.remove(cmdId); - } - - /** - * Updates status of a pending status command. - * @param cmdId command id - * @param cmdStatusUpdater Consumer to update command status. - * @return true if command status updated successfully else false. - */ - public boolean updateCommandStatus(Long cmdId, - Consumer<CommandStatus> cmdStatusUpdater) { - if(cmdStatusMap.containsKey(cmdId)) { - cmdStatusUpdater.accept(cmdStatusMap.get(cmdId)); - return true; - } - return false; - } - - public void configureHeartbeatFrequency(){ - heartbeatFrequency.set(getScmHeartbeatInterval(conf)); - } - - /** - * Return current heartbeat frequency in ms. - */ - public long getHeartbeatFrequency() { - return heartbeatFrequency.get(); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java deleted file mode 100644 index 2c3db61..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.apache.ratis.protocol.NotLeaderException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.UUID; - -/** - * Handler for close container command received from SCM. - */ -public class CloseContainerCommandHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(CloseContainerCommandHandler.class); - private int invocationCount; - private long totalTime; - private boolean cmdExecuted; - - /** - * Constructs a ContainerReport handler. - */ - public CloseContainerCommandHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Close Container command."); - invocationCount++; - cmdExecuted = false; - long startTime = Time.monotonicNow(); - // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA) - long containerID = -1; - try { - - CloseContainerCommandProto - closeContainerProto = - CloseContainerCommandProto - .parseFrom(command.getProtoBufMessage()); - containerID = closeContainerProto.getContainerID(); - if (container.getContainerSet().getContainer(containerID) - .getContainerData().isClosed()) { - LOG.debug("Container {} is already closed", containerID); - // It might happen that the where the first attempt of closing the - // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will - // retry to check the container got really closed via Ratis. - // In such cases of the retry attempt, if the container is already - // closed via Ratis, we should just return. - cmdExecuted = true; - return; - } - HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID(); - HddsProtos.ReplicationType replicationType = - closeContainerProto.getReplicationType(); - - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CloseContainer); - request.setContainerID(containerID); - request.setCloseContainer( - ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid( - context.getParent().getDatanodeDetails().getUuidString()); - // submit the close container request for the XceiverServer to handle - container.submitContainerRequest( - request.build(), replicationType, pipelineID); - } catch (Exception e) { - if (e instanceof NotLeaderException) { - // If the particular datanode is not the Ratis leader, the close - // container command will not be executed by the follower but will be - // executed by Ratis stateMachine transactions via leader to follower. - // There can also be case where the datanode is in candidate state. - // In these situations, NotLeaderException is thrown. Remove the status - // from cmdStatus Map here so that it will be retried only by SCM if the - // leader could not not close the container after a certain time. - context.removeCommandStatus(containerID); - LOG.info(e.getLocalizedMessage()); - } else { - LOG.error("Can't close container " + containerID, e); - cmdExecuted = false; - } - } finally { - updateCommandStatus(context, command, - (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG); - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public SCMCommandProto.Type getCommandType() { - return SCMCommandProto.Type.closeContainerCommand; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java deleted file mode 100644 index 5163d98..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Dispatches command to the correct handler. - */ -public final class CommandDispatcher { - static final Logger LOG = - LoggerFactory.getLogger(CommandDispatcher.class); - private final StateContext context; - private final Map<Type, CommandHandler> handlerMap; - private final OzoneContainer container; - private final SCMConnectionManager connectionManager; - - /** - * Constructs a command Dispatcher. - * @param context - Context. - */ - /** - * Constructs a command dispatcher. - * - * @param container - Ozone Container - * @param context - Context - * @param handlers - Set of handlers. - */ - private CommandDispatcher(OzoneContainer container, SCMConnectionManager - connectionManager, StateContext context, - CommandHandler... handlers) { - Preconditions.checkNotNull(context); - Preconditions.checkNotNull(handlers); - Preconditions.checkArgument(handlers.length > 0); - Preconditions.checkNotNull(container); - Preconditions.checkNotNull(connectionManager); - this.context = context; - this.container = container; - this.connectionManager = connectionManager; - handlerMap = new HashMap<>(); - for (CommandHandler h : handlers) { - if(handlerMap.containsKey(h.getCommandType())){ - LOG.error("Duplicate handler for the same command. Exiting. Handle " + - "key : { }", h.getCommandType().getDescriptorForType().getName()); - throw new IllegalArgumentException("Duplicate handler for the same " + - "command."); - } - handlerMap.put(h.getCommandType(), h); - } - } - - public CommandHandler getCloseContainerHandler() { - return handlerMap.get(Type.closeContainerCommand); - } - - /** - * Dispatch the command to the correct handler. - * - * @param command - SCM Command. - */ - public void handle(SCMCommand command) { - Preconditions.checkNotNull(command); - CommandHandler handler = handlerMap.get(command.getType()); - if (handler != null) { - handler.handle(command, container, context, connectionManager); - } else { - LOG.error("Unknown SCM Command queued. There is no handler for this " + - "command. Command: {}", command.getType().getDescriptorForType() - .getName()); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Helper class to construct command dispatcher. - */ - public static class Builder { - private final List<CommandHandler> handlerList; - private OzoneContainer container; - private StateContext context; - private SCMConnectionManager connectionManager; - - public Builder() { - handlerList = new LinkedList<>(); - } - - /** - * Adds a handler. - * - * @param handler - handler - * @return Builder - */ - public Builder addHandler(CommandHandler handler) { - Preconditions.checkNotNull(handler); - handlerList.add(handler); - return this; - } - - /** - * Add the OzoneContainer. - * - * @param ozoneContainer - ozone container. - * @return Builder - */ - public Builder setContainer(OzoneContainer ozoneContainer) { - Preconditions.checkNotNull(ozoneContainer); - this.container = ozoneContainer; - return this; - } - - /** - * Set the Connection Manager. - * - * @param scmConnectionManager - * @return this - */ - public Builder setConnectionManager(SCMConnectionManager - scmConnectionManager) { - Preconditions.checkNotNull(scmConnectionManager); - this.connectionManager = scmConnectionManager; - return this; - } - - /** - * Sets the Context. - * - * @param stateContext - StateContext - * @return this - */ - public Builder setContext(StateContext stateContext) { - Preconditions.checkNotNull(stateContext); - this.context = stateContext; - return this; - } - - /** - * Builds a command Dispatcher. - * @return Command Dispatcher. - */ - public CommandDispatcher build() { - Preconditions.checkNotNull(this.connectionManager, "Missing connection" + - " manager."); - Preconditions.checkNotNull(this.container, "Missing container."); - Preconditions.checkNotNull(this.context, "Missing context."); - Preconditions.checkArgument(this.handlerList.size() > 0); - return new CommandDispatcher(this.container, this.connectionManager, - this.context, handlerList.toArray( - new CommandHandler[handlerList.size()])); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java deleted file mode 100644 index 1ea0ea8..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; - -import java.util.function.Consumer; - -/** - * Generic interface for handlers. - */ -public interface CommandHandler { - - /** - * Handles a given SCM command. - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager); - - /** - * Returns the command type that this command handler handles. - * @return Type - */ - SCMCommandProto.Type getCommandType(); - - /** - * Returns number of times this handler has been invoked. - * @return int - */ - int getInvocationCount(); - - /** - * Returns the average time this function takes to run. - * @return long - */ - long getAverageRunTime(); - - /** - * Default implementation for updating command status. - */ - default void updateCommandStatus(StateContext context, SCMCommand command, - Consumer<CommandStatus> cmdStatusUpdater, Logger log) { - if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) { - log.debug("{} with Id:{} not found.", command.getType(), - command.getId()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java deleted file mode 100644 index aa63fb4..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import com.google.common.primitives.Longs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto - .DeleteBlockTransactionResult; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers - .DeletedContainerBlocksSummary; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.function.Consumer; - -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_NOT_FOUND; - -/** - * Handle block deletion commands. - */ -public class DeleteBlocksCommandHandler implements CommandHandler { - - public static final Logger LOG = - LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); - - private final ContainerSet containerSet; - private final Configuration conf; - private int invocationCount; - private long totalTime; - private boolean cmdExecuted; - - public DeleteBlocksCommandHandler(ContainerSet cset, - Configuration conf) { - this.containerSet = cset; - this.conf = conf; - } - - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - cmdExecuted = false; - long startTime = Time.monotonicNow(); - ContainerBlocksDeletionACKProto blockDeletionACK = null; - try { - if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { - LOG.warn("Skipping handling command, expected command " - + "type {} but found {}", - SCMCommandProto.Type.deleteBlocksCommand, command.getType()); - return; - } - LOG.debug("Processing block deletion command."); - invocationCount++; - - // move blocks to deleting state. - // this is a metadata update, the actual deletion happens in another - // recycling thread. - DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; - List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted(); - - DeletedContainerBlocksSummary summary = - DeletedContainerBlocksSummary.getFrom(containerBlocks); - LOG.info("Start to delete container blocks, TXIDs={}, " - + "numOfContainers={}, numOfBlocks={}", - summary.getTxIDSummary(), - summary.getNumOfContainers(), - summary.getNumOfBlocks()); - - ContainerBlocksDeletionACKProto.Builder resultBuilder = - ContainerBlocksDeletionACKProto.newBuilder(); - containerBlocks.forEach(entry -> { - DeleteBlockTransactionResult.Builder txResultBuilder = - DeleteBlockTransactionResult.newBuilder(); - txResultBuilder.setTxID(entry.getTxID()); - long containerId = entry.getContainerID(); - try { - Container cont = containerSet.getContainer(containerId); - if (cont == null) { - throw new StorageContainerException("Unable to find the container " - + containerId, CONTAINER_NOT_FOUND); - } - ContainerProtos.ContainerType containerType = cont.getContainerType(); - switch (containerType) { - case KeyValueContainer: - KeyValueContainerData containerData = (KeyValueContainerData) - cont.getContainerData(); - deleteKeyValueContainerBlocks(containerData, entry); - txResultBuilder.setContainerID(containerId) - .setSuccess(true); - break; - default: - LOG.error( - "Delete Blocks Command Handler is not implemented for " + - "containerType {}", containerType); - } - } catch (IOException e) { - LOG.warn("Failed to delete blocks for container={}, TXID={}", - entry.getContainerID(), entry.getTxID(), e); - txResultBuilder.setContainerID(containerId) - .setSuccess(false); - } - resultBuilder.addResults(txResultBuilder.build()) - .setDnId(context.getParent().getDatanodeDetails() - .getUuid().toString()); - }); - blockDeletionACK = resultBuilder.build(); - - // Send ACK back to SCM as long as meta updated - // TODO Or we should wait until the blocks are actually deleted? - if (!containerBlocks.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending following block deletion ACK to SCM"); - for (DeleteBlockTransactionResult result : blockDeletionACK - .getResultsList()) { - LOG.debug(result.getTxID() + " : " + result.getSuccess()); - } - } - } - cmdExecuted = true; - } finally { - final ContainerBlocksDeletionACKProto deleteAck = - blockDeletionACK; - Consumer<CommandStatus> statusUpdater = (cmdStatus) -> { - cmdStatus.setStatus(cmdExecuted); - ((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck); - }; - updateCommandStatus(context, command, statusUpdater, LOG); - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Move a bunch of blocks from a container to deleting state. This is a meta - * update, the actual deletes happen in async mode. - * - * @param containerData - KeyValueContainerData - * @param delTX a block deletion transaction. - * @throws IOException if I/O error occurs. - */ - private void deleteKeyValueContainerBlocks( - KeyValueContainerData containerData, DeletedBlocksTransaction delTX) - throws IOException { - long containerId = delTX.getContainerID(); - if (LOG.isDebugEnabled()) { - LOG.debug("Processing Container : {}, DB path : {}", containerId, - containerData.getMetadataPath()); - } - - if (delTX.getTxID() < containerData.getDeleteTransactionId()) { - LOG.debug(String.format("Ignoring delete blocks for containerId: %d." - + " Outdated delete transactionId %d < %d", containerId, - delTX.getTxID(), containerData.getDeleteTransactionId())); - return; - } - - int newDeletionBlocks = 0; - MetadataStore containerDB = BlockUtils.getDB(containerData, conf); - for (Long blk : delTX.getLocalIDList()) { - BatchOperation batch = new BatchOperation(); - byte[] blkBytes = Longs.toByteArray(blk); - byte[] blkInfo = containerDB.get(blkBytes); - if (blkInfo != null) { - byte[] deletingKeyBytes = - DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk); - byte[] deletedKeyBytes = - DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk); - if (containerDB.get(deletingKeyBytes) != null - || containerDB.get(deletedKeyBytes) != null) { - LOG.debug(String.format( - "Ignoring delete for block %d in container %d." - + " Entry already added.", blk, containerId)); - continue; - } - // Found the block in container db, - // use an atomic update to change its state to deleting. - batch.put(deletingKeyBytes, blkInfo); - batch.delete(blkBytes); - try { - containerDB.writeBatch(batch); - newDeletionBlocks++; - LOG.debug("Transited Block {} to DELETING state in container {}", - blk, containerId); - } catch (IOException e) { - // if some blocks failed to delete, we fail this TX, - // without sending this ACK to SCM, SCM will resend the TX - // with a certain number of retries. - throw new IOException( - "Failed to delete blocks for TXID = " + delTX.getTxID(), e); - } - } else { - LOG.debug("Block {} not found or already under deletion in" - + " container {}, skip deleting it.", blk, containerId); - } - } - - containerDB - .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX), - Longs.toByteArray(delTX.getTxID())); - containerData - .updateDeleteTransactionId(delTX.getTxID()); - // update pending deletion blocks count in in-memory container status - containerData.incrPendingDeletionBlocks(newDeletionBlocks); - } - - @Override - public SCMCommandProto.Type getCommandType() { - return SCMCommandProto.Type.deleteBlocksCommand; - } - - @Override - public int getInvocationCount() { - return this.invocationCount; - } - - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java deleted file mode 100644 index 81d162d..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; -import org.apache.hadoop.ozone.container.replication.ReplicationTask; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Command handler to copy containers from sources. - */ -public class ReplicateContainerCommandHandler implements CommandHandler { - - static final Logger LOG = - LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); - - private int invocationCount; - - private long totalTime; - - private Configuration conf; - - private ReplicationSupervisor supervisor; - - public ReplicateContainerCommandHandler( - Configuration conf, - ReplicationSupervisor supervisor) { - this.conf = conf; - this.supervisor = supervisor; - } - - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - - ReplicateContainerCommand replicateCommand = - (ReplicateContainerCommand) command; - try { - List<DatanodeDetails> sourceDatanodes = - replicateCommand.getSourceDatanodes(); - long containerID = replicateCommand.getContainerID(); - - Preconditions.checkArgument(sourceDatanodes.size() > 0, - String.format("Replication command is received for container %d " - + "but the size of source datanodes was 0.", containerID)); - - ReplicationTask replicationTask = - new ReplicationTask(containerID, sourceDatanodes); - supervisor.addTask(replicationTask); - - } finally { - updateCommandStatus(context, command, - (cmdStatus) -> cmdStatus.setStatus(true), LOG); - } - } - - @Override - public SCMCommandProto.Type getCommandType() { - return Type.replicateContainerCommand; - } - - @Override - public int getInvocationCount() { - return this.invocationCount; - } - - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java deleted file mode 100644 index 1e9c8dc..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java deleted file mode 100644 index feb2f81..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; -/** - - State machine class is used by the container to denote various states a - container can be in and also is used for command processing. - - Container has the following states. - - Start - > getVersion -> Register -> Running -> Shutdown - - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java deleted file mode 100644 index 75142af..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.states; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * State Interface that allows tasks to maintain states. - */ -public interface DatanodeState<T> { - /** - * Called before entering this state. - */ - void onEnter(); - - /** - * Called After exiting this state. - */ - void onExit(); - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - void execute(ExecutorService executor); - - /** - * Wait for execute to finish. - * - * @param time - Time - * @param timeUnit - Unit of time. - */ - T await(long time, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java deleted file mode 100644 index 995f172..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.datanode; - -import com.google.common.base.Strings; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.statemachine - .DatanodeStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses; - -/** - * Init Datanode State is the task that gets run when we are in Init State. - */ -public class InitDatanodeState implements DatanodeState, - Callable<DatanodeStateMachine.DatanodeStates> { - static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class); - private final SCMConnectionManager connectionManager; - private final Configuration conf; - private final StateContext context; - private Future<DatanodeStateMachine.DatanodeStates> result; - - /** - * Create InitDatanodeState Task. - * - * @param conf - Conf - * @param connectionManager - Connection Manager - * @param context - Current Context - */ - public InitDatanodeState(Configuration conf, - SCMConnectionManager connectionManager, - StateContext context) { - this.conf = conf; - this.connectionManager = connectionManager; - this.context = context; - } - - /** - * Computes a result, or throws an exception if unable to do so. - * - * @return computed result - * @throws Exception if unable to compute a result - */ - @Override - public DatanodeStateMachine.DatanodeStates call() throws Exception { - Collection<InetSocketAddress> addresses = null; - try { - addresses = getSCMAddresses(conf); - } catch (IllegalArgumentException e) { - if(!Strings.isNullOrEmpty(e.getMessage())) { - LOG.error("Failed to get SCM addresses: " + e.getMessage()); - } - return DatanodeStateMachine.DatanodeStates.SHUTDOWN; - } - - if (addresses == null || addresses.isEmpty()) { - LOG.error("Null or empty SCM address list found."); - return DatanodeStateMachine.DatanodeStates.SHUTDOWN; - } else { - for (InetSocketAddress addr : addresses) { - if (addr.isUnresolved()) { - LOG.warn("One SCM address ({}) can't (yet?) be resolved. Postpone " - + "initialization.", addr); - - //skip any further initialization. DatanodeStateMachine will try it - // again after the hb frequency - return this.context.getState(); - } - } - for (InetSocketAddress addr : addresses) { - connectionManager.addSCMServer(addr); - } - } - - // If datanode ID is set, persist it to the ID file. - persistContainerDatanodeDetails(); - - return this.context.getState().getNextState(); - } - - /** - * Persist DatanodeDetails to datanode.id file. - */ - private void persistContainerDatanodeDetails() { - String dataNodeIDPath = HddsUtils.getDatanodeIdFilePath(conf); - if (Strings.isNullOrEmpty(dataNodeIDPath)) { - LOG.error("A valid file path is needed for config setting {}", - ScmConfigKeys.OZONE_SCM_DATANODE_ID); - this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); - return; - } - File idPath = new File(dataNodeIDPath); - DatanodeDetails datanodeDetails = this.context.getParent() - .getDatanodeDetails(); - if (datanodeDetails != null && !idPath.exists()) { - try { - ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath); - } catch (IOException ex) { - // As writing DatanodeDetails in to datanodeid file failed, which is - // a critical thing, so shutting down the state machine. - LOG.error("Writing to {} failed {}", dataNodeIDPath, ex.getMessage()); - this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); - return; - } - LOG.info("DatanodeDetails is persisted to {}", dataNodeIDPath); - } - } - - /** - * Called before entering this state. - */ - @Override - public void onEnter() { - LOG.trace("Entering init container state"); - } - - /** - * Called After exiting this state. - */ - @Override - public void onExit() { - LOG.trace("Exiting init container state"); - } - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - @Override - public void execute(ExecutorService executor) { - result = executor.submit(this); - } - - /** - * Wait for execute to finish. - * - * @param time - Time - * @param timeUnit - Unit of time. - */ - @Override - public DatanodeStateMachine.DatanodeStates await(long time, - TimeUnit timeUnit) throws InterruptedException, - ExecutionException, TimeoutException { - return result.get(time, timeUnit); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java deleted file mode 100644 index ec2358a..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.datanode; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask; -import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask; -import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Class that implements handshake with SCM. - */ -public class RunningDatanodeState implements DatanodeState { - static final Logger - LOG = LoggerFactory.getLogger(RunningDatanodeState.class); - private final SCMConnectionManager connectionManager; - private final Configuration conf; - private final StateContext context; - private CompletionService<EndpointStateMachine.EndPointStates> ecs; - - public RunningDatanodeState(Configuration conf, - SCMConnectionManager connectionManager, - StateContext context) { - this.connectionManager = connectionManager; - this.conf = conf; - this.context = context; - } - - /** - * Called before entering this state. - */ - @Override - public void onEnter() { - LOG.trace("Entering handshake task."); - } - - /** - * Called After exiting this state. - */ - @Override - public void onExit() { - LOG.trace("Exiting handshake task."); - } - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - @Override - public void execute(ExecutorService executor) { - ecs = new ExecutorCompletionService<>(executor); - for (EndpointStateMachine endpoint : connectionManager.getValues()) { - Callable<EndpointStateMachine.EndPointStates> endpointTask - = getEndPointTask(endpoint); - ecs.submit(endpointTask); - } - } - //TODO : Cache some of these tasks instead of creating them - //all the time. - private Callable<EndpointStateMachine.EndPointStates> - getEndPointTask(EndpointStateMachine endpoint) { - switch (endpoint.getState()) { - case GETVERSION: - return new VersionEndpointTask(endpoint, conf, context.getParent() - .getContainer()); - case REGISTER: - return RegisterEndpointTask.newBuilder() - .setConfig(conf) - .setEndpointStateMachine(endpoint) - .setContext(context) - .setDatanodeDetails(context.getParent().getDatanodeDetails()) - .setOzoneContainer(context.getParent().getContainer()) - .build(); - case HEARTBEAT: - return HeartbeatEndpointTask.newBuilder() - .setConfig(conf) - .setEndpointStateMachine(endpoint) - .setDatanodeDetails(context.getParent().getDatanodeDetails()) - .setContext(context) - .build(); - case SHUTDOWN: - break; - default: - throw new IllegalArgumentException("Illegal Argument."); - } - return null; - } - - /** - * Computes the next state the container state machine must move to by looking - * at all the state of endpoints. - * <p> - * if any endpoint state has moved to Shutdown, either we have an - * unrecoverable error or we have been told to shutdown. Either case the - * datanode state machine should move to Shutdown state, otherwise we - * remain in the Running state. - * - * @return next container state. - */ - private DatanodeStateMachine.DatanodeStates - computeNextContainerState( - List<Future<EndpointStateMachine.EndPointStates>> results) { - for (Future<EndpointStateMachine.EndPointStates> state : results) { - try { - if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) { - // if any endpoint tells us to shutdown we move to shutdown state. - return DatanodeStateMachine.DatanodeStates.SHUTDOWN; - } - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error in executing end point task.", e); - } - } - return DatanodeStateMachine.DatanodeStates.RUNNING; - } - - /** - * Wait for execute to finish. - * - * @param duration - Time - * @param timeUnit - Unit of duration. - */ - @Override - public DatanodeStateMachine.DatanodeStates - await(long duration, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException { - int count = connectionManager.getValues().size(); - int returned = 0; - long timeLeft = timeUnit.toMillis(duration); - long startTime = Time.monotonicNow(); - List<Future<EndpointStateMachine.EndPointStates>> results = new - LinkedList<>(); - - while (returned < count && timeLeft > 0) { - Future<EndpointStateMachine.EndPointStates> result = - ecs.poll(timeLeft, TimeUnit.MILLISECONDS); - if (result != null) { - results.add(result); - returned++; - } - timeLeft = timeLeft - (Time.monotonicNow() - startTime); - } - return computeNextContainerState(results); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java deleted file mode 100644 index 6b8d16c..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.datanode; -/** - This package contians files that guide the state transitions from - Init->Running->Shutdown for the datanode. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java deleted file mode 100644 index 4fd72ec..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.states.endpoint; - -import com.google.common.base.Preconditions; -import com.google.protobuf.Descriptors; -import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineAction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.ozone.container.common.helpers - .DeletedContainerBlocksSummary; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine.EndPointStates; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.ZonedDateTime; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; - -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_CONTAINER_ACTION_MAX_LIMIT; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_PIPELINE_ACTION_MAX_LIMIT; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT; - -/** - * Heartbeat class for SCMs. - */ -public class HeartbeatEndpointTask - implements Callable<EndpointStateMachine.EndPointStates> { - static final Logger LOG = - LoggerFactory.getLogger(HeartbeatEndpointTask.class); - private final EndpointStateMachine rpcEndpoint; - private final Configuration conf; - private DatanodeDetailsProto datanodeDetailsProto; - private StateContext context; - private int maxContainerActionsPerHB; - private int maxPipelineActionsPerHB; - - /** - * Constructs a SCM heart beat. - * - * @param conf Config. - */ - public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, - Configuration conf, StateContext context) { - this.rpcEndpoint = rpcEndpoint; - this.conf = conf; - this.context = context; - this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT, - HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT); - this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT, - HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT); - } - - /** - * Get the container Node ID proto. - * - * @return ContainerNodeIDProto - */ - public DatanodeDetailsProto getDatanodeDetailsProto() { - return datanodeDetailsProto; - } - - /** - * Set container node ID proto. - * - * @param datanodeDetailsProto - the node id. - */ - public void setDatanodeDetailsProto(DatanodeDetailsProto - datanodeDetailsProto) { - this.datanodeDetailsProto = datanodeDetailsProto; - } - - /** - * Computes a result, or throws an exception if unable to do so. - * - * @return computed result - * @throws Exception if unable to compute a result - */ - @Override - public EndpointStateMachine.EndPointStates call() throws Exception { - rpcEndpoint.lock(); - SCMHeartbeatRequestProto.Builder requestBuilder = null; - try { - Preconditions.checkState(this.datanodeDetailsProto != null); - - requestBuilder = SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetailsProto); - addReports(requestBuilder); - addContainerActions(requestBuilder); - addPipelineActions(requestBuilder); - SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() - .sendHeartbeat(requestBuilder.build()); - processResponse(reponse, datanodeDetailsProto); - rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); - rpcEndpoint.zeroMissedCount(); - } catch (IOException ex) { - // put back the reports which failed to be sent - putBackReports(requestBuilder); - rpcEndpoint.logIfNeeded(ex); - } finally { - rpcEndpoint.unlock(); - } - return rpcEndpoint.getState(); - } - - // TODO: Make it generic. - private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { - List<GeneratedMessage> reports = new LinkedList<>(); - if (requestBuilder.hasContainerReport()) { - reports.add(requestBuilder.getContainerReport()); - } - if (requestBuilder.hasNodeReport()) { - reports.add(requestBuilder.getNodeReport()); - } - if (requestBuilder.getCommandStatusReportsCount() != 0) { - for (GeneratedMessage msg : requestBuilder - .getCommandStatusReportsList()) { - reports.add(msg); - } - } - context.putBackReports(reports); - } - - /** - * Adds all the available reports to heartbeat. - * - * @param requestBuilder builder to which the report has to be added. - */ - private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { - for (GeneratedMessage report : context.getAllAvailableReports()) { - String reportName = report.getDescriptorForType().getFullName(); - for (Descriptors.FieldDescriptor descriptor : - SCMHeartbeatRequestProto.getDescriptor().getFields()) { - String heartbeatFieldName = descriptor.getMessageType().getFullName(); - if (heartbeatFieldName.equals(reportName)) { - if (descriptor.isRepeated()) { - requestBuilder.addRepeatedField(descriptor, report); - } else { - requestBuilder.setField(descriptor, report); - } - } - } - } - } - - /** - * Adds all the pending ContainerActions to the heartbeat. - * - * @param requestBuilder builder to which the report has to be added. - */ - private void addContainerActions( - SCMHeartbeatRequestProto.Builder requestBuilder) { - List<ContainerAction> actions = context.getPendingContainerAction( - maxContainerActionsPerHB); - if (!actions.isEmpty()) { - ContainerActionsProto cap = ContainerActionsProto.newBuilder() - .addAllContainerActions(actions) - .build(); - requestBuilder.setContainerActions(cap); - } - } - - /** - * Adds all the pending PipelineActions to the heartbeat. - * - * @param requestBuilder builder to which the report has to be added. - */ - private void addPipelineActions( - SCMHeartbeatRequestProto.Builder requestBuilder) { - List<PipelineAction> actions = context.getPendingPipelineAction( - maxPipelineActionsPerHB); - if (!actions.isEmpty()) { - PipelineActionsProto pap = PipelineActionsProto.newBuilder() - .addAllPipelineActions(actions) - .build(); - requestBuilder.setPipelineActions(pap); - } - } - - /** - * Returns a builder class for HeartbeatEndpointTask task. - * @return Builder. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Add this command to command processing Queue. - * - * @param response - SCMHeartbeat response. - */ - private void processResponse(SCMHeartbeatResponseProto response, - final DatanodeDetailsProto datanodeDetails) { - Preconditions.checkState(response.getDatanodeUUID() - .equalsIgnoreCase(datanodeDetails.getUuid()), - "Unexpected datanode ID in the response."); - // Verify the response is indeed for this datanode. - for (SCMCommandProto commandResponseProto : response - .getCommandsList()) { - switch (commandResponseProto.getCommandType()) { - case reregisterCommand: - if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received SCM notification to register." - + " Interrupt HEARTBEAT and transit to REGISTER state."); - } - rpcEndpoint.setState(EndPointStates.REGISTER); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Illegal state {} found, expecting {}.", - rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT); - } - } - break; - case deleteBlocksCommand: - DeleteBlocksCommand db = DeleteBlocksCommand - .getFromProtobuf( - commandResponseProto.getDeleteBlocksCommandProto()); - if (!db.blocksTobeDeleted().isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug(DeletedContainerBlocksSummary - .getFrom(db.blocksTobeDeleted()) - .toString()); - } - this.context.addCommand(db); - } - break; - case closeContainerCommand: - CloseContainerCommand closeContainer = - CloseContainerCommand.getFromProtobuf( - commandResponseProto.getCloseContainerCommandProto()); - if (LOG.isDebugEnabled()) { - LOG.debug("Received SCM container close request for container {}", - closeContainer.getContainerID()); - } - this.context.addCommand(closeContainer); - break; - case replicateContainerCommand: - ReplicateContainerCommand replicateContainerCommand = - ReplicateContainerCommand.getFromProtobuf( - commandResponseProto.getReplicateContainerCommandProto()); - if (LOG.isDebugEnabled()) { - LOG.debug("Received SCM container replicate request for container {}", - replicateContainerCommand.getContainerID()); - } - this.context.addCommand(replicateContainerCommand); - break; - default: - throw new IllegalArgumentException("Unknown response : " - + commandResponseProto.getCommandType().name()); - } - } - } - - /** - * Builder class for HeartbeatEndpointTask. - */ - public static class Builder { - private EndpointStateMachine endPointStateMachine; - private Configuration conf; - private DatanodeDetails datanodeDetails; - private StateContext context; - - /** - * Constructs the builder class. - */ - public Builder() { - } - - /** - * Sets the endpoint state machine. - * - * @param rpcEndPoint - Endpoint state machine. - * @return Builder - */ - public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { - this.endPointStateMachine = rpcEndPoint; - return this; - } - - /** - * Sets the Config. - * - * @param config - config - * @return Builder - */ - public Builder setConfig(Configuration config) { - this.conf = config; - return this; - } - - /** - * Sets the NodeID. - * - * @param dnDetails - NodeID proto - * @return Builder - */ - public Builder setDatanodeDetails(DatanodeDetails dnDetails) { - this.datanodeDetails = dnDetails; - return this; - } - - /** - * Sets the context. - * @param stateContext - State context. - * @return this. - */ - public Builder setContext(StateContext stateContext) { - this.context = stateContext; - return this; - } - - public HeartbeatEndpointTask build() { - if (endPointStateMachine == null) { - LOG.error("No endpoint specified."); - throw new IllegalArgumentException("A valid endpoint state machine is" + - " needed to construct HeartbeatEndpointTask task"); - } - - if (conf == null) { - LOG.error("No config specified."); - throw new IllegalArgumentException("A valid configration is needed to" + - " construct HeartbeatEndpointTask task"); - } - - if (datanodeDetails == null) { - LOG.error("No datanode specified."); - throw new IllegalArgumentException("A vaild Node ID is needed to " + - "construct HeartbeatEndpointTask task"); - } - - HeartbeatEndpointTask task = new HeartbeatEndpointTask(this - .endPointStateMachine, this.conf, this.context); - task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage()); - return task; - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org