HDFS-10180. Ozone: Refactor container Namespace. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c73a32c2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c73a32c2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c73a32c2 Branch: refs/heads/HDFS-7240 Commit: c73a32c21cad568915860cc200213d6c4c8408cf Parents: 37e3a36 Author: Chris Nauroth <cnaur...@apache.org> Authored: Fri Mar 18 13:46:56 2016 -0700 Committer: Chris Nauroth <cnaur...@apache.org> Committed: Fri Mar 18 13:46:56 2016 -0700 ---------------------------------------------------------------------- .../container/common/helpers/ContainerData.java | 170 +++++++++++++++++ .../common/helpers/ContainerUtils.java | 110 +++++++++++ .../container/common/helpers/Pipeline.java | 132 +++++++++++++ .../container/common/helpers/package-info.java | 21 +++ .../ozone/container/common/impl/Dispatcher.java | 189 +++++++++++++++++++ .../container/common/impl/package-info.java | 22 +++ .../common/interfaces/ContainerDispatcher.java | 44 +++++ .../common/interfaces/ContainerManager.java | 75 ++++++++ .../ozone/container/common/package-info.java | 28 +++ .../common/transport/client/XceiverClient.java | 122 ++++++++++++ .../transport/client/XceiverClientHandler.java | 112 +++++++++++ .../client/XceiverClientInitializer.java | 68 +++++++ .../common/transport/server/XceiverServer.java | 92 +++++++++ .../transport/server/XceiverServerHandler.java | 80 ++++++++ .../server/XceiverServerInitializer.java | 61 ++++++ .../ozone/container/helpers/ContainerData.java | 170 ----------------- .../ozone/container/helpers/ContainerUtils.java | 110 ----------- .../ozone/container/helpers/Pipeline.java | 132 ------------- .../ozone/container/helpers/package-info.java | 21 --- .../interfaces/ContainerDispatcher.java | 44 ----- .../container/interfaces/ContainerManager.java | 75 -------- .../ozone/container/ozoneimpl/Dispatcher.java | 185 ------------------ .../ozone/container/ozoneimpl/package-info.java | 22 --- .../transport/client/XceiverClient.java | 122 ------------ .../transport/client/XceiverClientHandler.java | 112 ----------- .../client/XceiverClientInitializer.java | 68 ------- .../transport/server/XceiverServer.java | 92 --------- .../transport/server/XceiverServerHandler.java | 80 -------- .../server/XceiverServerInitializer.java | 61 ------ .../ozone/container/ContainerTestHelper.java | 2 +- .../transport/server/TestContainerServer.java | 13 +- 31 files changed, 1335 insertions(+), 1300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java new file mode 100644 index 0000000..8f5120a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +/** + * This class maintains the information about a container in the ozone world. + * <p> + * A container is a name, along with metadata- which is a set of key value + * pair. + */ +public class ContainerData { + + private final String containerName; + private final Map<String, String> metadata; + + private String path; + + /** + * Constructs a ContainerData Object. + * + * @param containerName - Name + */ + public ContainerData(String containerName) { + this.metadata = new TreeMap<>(); + this.containerName = containerName; + } + + /** + * Constructs a ContainerData object from ProtoBuf classes. + * + * @param protoData - ProtoBuf Message + * @throws IOException + */ + public static ContainerData getFromProtBuf( + ContainerProtos.ContainerData protoData) throws IOException { + ContainerData data = new ContainerData(protoData.getName()); + for (int x = 0; x < protoData.getMetadataCount(); x++) { + data.addMetadata(protoData.getMetadata(x).getKey(), + protoData.getMetadata(x).getValue()); + } + + if (protoData.hasContainerPath()) { + data.setPath(protoData.getContainerPath()); + } + return data; + } + + /** + * Returns a ProtoBuf Message from ContainerData. + * + * @return Protocol Buffer Message + */ + public ContainerProtos.ContainerData getProtoBufMessage() { + ContainerProtos.ContainerData.Builder builder = ContainerProtos + .ContainerData.newBuilder(); + builder.setName(this.getContainerName()); + if (this.getPath() != null) { + builder.setContainerPath(this.getPath()); + } + for (Map.Entry<String, String> entry : metadata.entrySet()) { + ContainerProtos.KeyValue.Builder keyValBuilder = + ContainerProtos.KeyValue.newBuilder(); + builder.addMetadata(keyValBuilder.setKey(entry.getKey()) + .setValue(entry.getValue()).build()); + } + return builder.build(); + } + + /** + * Returns the name of the container. + * + * @return - name + */ + public String getContainerName() { + return containerName; + } + + /** + * Adds metadata. + */ + public void addMetadata(String key, String value) throws IOException { + synchronized (this.metadata) { + if (this.metadata.containsKey(key)) { + throw new IOException("This key already exists. Key " + key); + } + metadata.put(key, value); + } + } + + /** + * Returns all metadata. + */ + public Map<String, String> getAllMetadata() { + synchronized (this.metadata) { + return Collections.unmodifiableMap(this.metadata); + } + } + + /** + * Returns value of a key. + */ + public String getValue(String key) { + synchronized (this.metadata) { + return metadata.get(key); + } + } + + /** + * Deletes a metadata entry from the map. + * + * @param key - Key + */ + public void deleteKey(String key) { + synchronized (this.metadata) { + metadata.remove(key); + } + } + + /** + * Returns path. + * + * @return - path + */ + public String getPath() { + return path; + } + + /** + * Sets path. + * + * @param path - String. + */ + public void setPath(String path) { + this.path = path; + } + + /** + * This function serves as the generic key for OzoneCache class. Both + * ContainerData and ContainerKeyData overrides this function to appropriately + * return the right name that can be used in OzoneCache. + * + * @return String Name. + */ + public String getName() { + return getContainerName(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java new file mode 100644 index 0000000..23e1804 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; + +/** + * A set of helper functions to create proper responses. + */ +public final class ContainerUtils { + + /** + * Returns a CreateContainer Response. This call is used by create and delete + * containers which have null success responses. + * + * @param msg Request + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) { + ContainerProtos.ContainerCommandResponseProto.Builder builder = + getContainerResponse(msg, ContainerProtos.Result.SUCCESS, ""); + return builder.build(); + } + + /** + * Returns a ReadContainer Response. + * + * @param msg Request + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg, + ContainerData containerData) { + Preconditions.checkNotNull(containerData); + + ContainerProtos.ReadContainerResponseProto.Builder response = + ContainerProtos.ReadContainerResponseProto.newBuilder(); + response.setContainerData(containerData.getProtoBufMessage()); + + ContainerProtos.ContainerCommandResponseProto.Builder builder = + getContainerResponse(msg, ContainerProtos.Result.SUCCESS, ""); + builder.setReadContainer(response); + return builder.build(); + } + + /** + * We found a command type but no associated payload for the command. Hence + * return malformed Command as response. + * + * @param msg - Protobuf message. + * @return ContainerCommandResponseProto - MALFORMED_REQUEST. + */ + public static ContainerProtos.ContainerCommandResponseProto.Builder + getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg, + ContainerProtos.Result result, String message) { + return + ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(msg.getCmdType()) + .setTraceID(msg.getTraceID()) + .setResult(result) + .setMessage(message); + } + + /** + * We found a command type but no associated payload for the command. Hence + * return malformed Command as response. + * + * @param msg - Protobuf message. + * @return ContainerCommandResponseProto - MALFORMED_REQUEST. + */ + public static ContainerProtos.ContainerCommandResponseProto + malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) { + return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST, + "Cmd type does not match the payload.").build(); + } + + /** + * We found a command type that is not supported yet. + * + * @param msg - Protobuf message. + * @return ContainerCommandResponseProto - MALFORMED_REQUEST. + */ + public static ContainerProtos.ContainerCommandResponseProto + unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) { + return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST, + "Server does not support this command yet.").build(); + } + + private ContainerUtils() { + //never constructed. + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java new file mode 100644 index 0000000..140341c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * A pipeline represents the group of machines over which a container lives. + */ +public class Pipeline { + private String containerName; + private String leaderID; + private Map<String, DatanodeID> datanodes; + + /** + * Constructs a new pipeline data structure. + * + * @param leaderID - First machine in this pipeline. + */ + public Pipeline(String leaderID) { + this.leaderID = leaderID; + datanodes = new TreeMap<>(); + } + + /** + * Gets pipeline object from protobuf. + * + * @param pipeline - ProtoBuf definition for the pipeline. + * @return Pipeline Object + */ + public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) { + Preconditions.checkNotNull(pipeline); + Pipeline newPipeline = new Pipeline(pipeline.getLeaderID()); + for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) { + newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID)); + } + if (pipeline.hasContainerName()) { + newPipeline.containerName = newPipeline.getContainerName(); + } + return newPipeline; + } + + /** Adds a member to pipeline */ + + /** + * Adds a member to the pipeline. + * + * @param dataNodeId - Datanode to be added. + */ + public void addMember(DatanodeID dataNodeId) { + datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId); + } + + /** + * Returns the first machine in the set of datanodes. + * + * @return First Machine. + */ + public DatanodeID getLeader() { + return datanodes.get(leaderID); + } + + /** + * Returns all machines that make up this pipeline. + * + * @return List of Machines. + */ + public List<DatanodeID> getMachines() { + return new ArrayList<>(datanodes.values()); + } + + /** + * Return a Protobuf Pipeline message from pipeline. + * + * @return Protobuf message + */ + public ContainerProtos.Pipeline getProtobufMessage() { + ContainerProtos.Pipeline.Builder builder = + ContainerProtos.Pipeline.newBuilder(); + for (DatanodeID datanode : datanodes.values()) { + builder.addMembers(datanode.getProtoBufMessage()); + } + builder.setLeaderID(leaderID); + if (this.containerName != null) { + builder.setContainerName(this.containerName); + } + return builder.build(); + } + + /** + * Returns containerName if available. + * + * @return String. + */ + public String getContainerName() { + return containerName; + } + + /** + * Sets the container Name. + * + * @param containerName - Name of the container. + */ + public void setContainerName(String containerName) { + this.containerName = containerName; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java new file mode 100644 index 0000000..fe7e37a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; +/** + Contains protocol buffer helper classes. + **/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java new file mode 100644 index 0000000..7a45557 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Ozone Container dispatcher takes a call from the netty server and routes it + * to the right handler function. + */ +public class Dispatcher implements ContainerDispatcher { + static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); + + private final ContainerManager containerManager; + + /** + * Constructs an OzoneContainer that receives calls from + * XceiverServerHandler. + * + * @param containerManager - A class that manages containers. + */ + public Dispatcher(ContainerManager containerManager) { + Preconditions.checkNotNull(containerManager); + this.containerManager = containerManager; + } + + @Override + public ContainerCommandResponseProto dispatch( + ContainerCommandRequestProto msg) throws IOException { + Preconditions.checkNotNull(msg); + Type cmdType = msg.getCmdType(); + if ((cmdType == Type.CreateContainer) || + (cmdType == Type.DeleteContainer) || + (cmdType == Type.ReadContainer) || + (cmdType == Type.ListContainer) || + (cmdType == Type.UpdateContainer)) { + + return containerProcessHandler(msg); + } + + + return ContainerUtils.unsupportedRequest(msg); + } + + /** + * Handles the all Container related functionality. + * + * @param msg - command + * @return - response + * @throws IOException + */ + private ContainerCommandResponseProto containerProcessHandler( + ContainerCommandRequestProto msg) throws IOException { + try { + ContainerData cData = ContainerData.getFromProtBuf( + msg.getCreateContainer().getContainerData()); + + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getCreateContainer().getPipeline()); + Preconditions.checkNotNull(pipeline); + + switch (msg.getCmdType()) { + case CreateContainer: + return handleCreateContainer(msg, cData, pipeline); + + case DeleteContainer: + return handleDeleteContainer(msg, cData, pipeline); + + case ListContainer: + return ContainerUtils.unsupportedRequest(msg); + + case UpdateContainer: + return ContainerUtils.unsupportedRequest(msg); + + case ReadContainer: + return handleReadContainer(msg, cData); + + default: + return ContainerUtils.unsupportedRequest(msg); + } + } catch (IOException ex) { + LOG.warn("Container operation failed. " + + "Container: {} Operation: {} trace ID: {} Error: {}", + msg.getCreateContainer().getContainerData().getName(), + msg.getCmdType().name(), + msg.getTraceID(), + ex.toString()); + + // TODO : Replace with finer error codes. + return ContainerUtils.getContainerResponse(msg, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, + ex.toString()).build(); + } + } + + /** + * Calls into container logic and returns appropriate response. + * + * @param msg - Request + * @param cData - Container Data object + * @return ContainerCommandResponseProto + * @throws IOException + */ + private ContainerCommandResponseProto handleReadContainer( + ContainerCommandRequestProto msg, ContainerData cData) + throws IOException { + + if (!msg.hasReadContainer()) { + LOG.debug("Malformed read container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + ContainerData container = this.containerManager.readContainer( + cData.getContainerName()); + return ContainerUtils.getReadContainerResponse(msg, container); + } + + /** + * Calls into container logic and returns appropriate response. + * + * @param msg - Request + * @param cData - ContainerData + * @param pipeline - Pipeline is the machines where this container lives. + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleDeleteContainer( + ContainerCommandRequestProto msg, ContainerData cData, + Pipeline pipeline) throws IOException { + if (!msg.hasDeleteContainer()) { + LOG.debug("Malformed delete container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + this.containerManager.deleteContainer(pipeline, + cData.getContainerName()); + return ContainerUtils.getContainerResponse(msg); + } + + /** + * Calls into container logic and returns appropriate response. + * + * @param msg - Request + * @param cData - ContainerData + * @param pipeline - Pipeline is the machines where this container lives. + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleCreateContainer( + ContainerCommandRequestProto msg, ContainerData cData, + Pipeline pipeline) throws IOException { + if (!msg.hasCreateContainer()) { + LOG.debug("Malformed create container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + this.containerManager.createContainer(pipeline, cData); + return ContainerUtils.getContainerResponse(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java new file mode 100644 index 0000000..16da5d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +/** + This package is contains Ozone container implementation. +**/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java new file mode 100644 index 0000000..6ad8377 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; + +import java.io.IOException; + +/** + * Dispatcher acts as the bridge between the transport layer and + * the actual container layer. This layer is capable of transforming + * protobuf objects into corresponding class and issue the function call + * into the lower layers. + * + * The reply from the request is dispatched to the client. + */ +public interface ContainerDispatcher { + /** + * Dispatches commands to container layer. + * @param msg - Command Request + * @return Command Response + * @throws IOException + */ + ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java new file mode 100644 index 0000000..780d932 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; + +import java.io.IOException; +import java.util.List; + +/** + * Interface for container operations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ContainerManager { + + /** + * Creates a container with the given name. + * + * @param pipeline -- Nodes which make up this container. + * @param containerData - Container Name and metadata. + * @throws IOException + */ + void createContainer(Pipeline pipeline, ContainerData containerData) + throws IOException; + + /** + * Deletes an existing container. + * + * @param pipeline - nodes that make this container. + * @param containerName - name of the container. + * @throws IOException + */ + void deleteContainer(Pipeline pipeline, String containerName) + throws IOException; + + /** + * As simple interface for container Iterations. + * + * @param start - Starting index + * @param count - how many to return + * @param data - Actual containerData + * @throws IOException + */ + void listContainer(long start, long count, List<ContainerData> data) + throws IOException; + + /** + * Get metadata about a specific container. + * + * @param containerName - Name of the container + * @return ContainerData + * @throws IOException + */ + ContainerData readContainer(String containerName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java new file mode 100644 index 0000000..1638a36 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common; +/** + Common Container Layer. At this layer the abstractions are: + + 1. Containers - Both data and metadata containers. + 2. Keys - Key/Value pairs that live inside a container. + 3. Chunks - Keys can be composed of many chunks. + + Ozone uses these abstractions to build Volumes, Buckets and Keys. + + **/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java new file mode 100644 index 0000000..05cd44a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.client; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A Client for the storageContainer protocol. + */ +public class XceiverClient { + static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); + private final Pipeline pipeline; + private final OzoneConfiguration config; + private ChannelFuture channelFuture; + private Bootstrap b; + private EventLoopGroup group; + + /** + * Constructs a client that can communicate with the Container framework on + * data nodes. + * @param pipeline - Pipeline that defines the machines. + * @param config -- Ozone Config + */ + public XceiverClient(Pipeline pipeline, OzoneConfiguration config) { + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(config); + this.pipeline = pipeline; + this.config = config; + } + + /** + * Connects to the leader in the pipeline. + */ + public void connect() throws Exception { + if (channelFuture != null + && channelFuture.channel() != null + && channelFuture.channel().isActive()) { + throw new IOException("This client is already connected to a host."); + } + + group = new NioEventLoopGroup(); + b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .handler(new XceiverClientInitializer(this.pipeline)); + DatanodeID leader = this.pipeline.getLeader(); + + // read port from the data node, on failure use default configured + // port. + int port = leader.getContainerPort(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT); + } + LOG.debug("Connecting to server Port : " + port); + channelFuture = b.connect(leader.getHostName(), port).sync(); + } + + /** + * Close the client. + */ + public void close() { + if(group != null) { + group.shutdownGracefully(); + } + + if (channelFuture != null) { + channelFuture.channel().close(); + } + } + + /** + * Sends a given command to server and gets the reply back. + * @param request Request + * @return Response to the command + * @throws IOException + */ + public ContainerProtos.ContainerCommandResponseProto sendCommand( + ContainerProtos.ContainerCommandRequestProto request) + throws IOException { + if((channelFuture == null) || (!channelFuture.channel().isActive())) { + throw new IOException("This channel is not connected."); + } + XceiverClientHandler handler = + channelFuture.channel().pipeline().get(XceiverClientHandler.class); + + return handler.sendCommand(request); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java new file mode 100644 index 0000000..a219e4e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.client; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Netty client handler. + */ +public class XceiverClientHandler extends + SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> { + + static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); + private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto> + responses = new LinkedBlockingQueue<>(); + private final Pipeline pipeline; + private volatile Channel channel; + + /** + * Constructs a client that can communicate to a container server. + */ + public XceiverClientHandler(Pipeline pipeline) { + super(false); + this.pipeline = pipeline; + } + + /** + * <strong>Please keep in mind that this method will be renamed to {@code + * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong> + * <p> + * Is called for each message of type {@link ContainerProtos + * .ContainerCommandResponseProto}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link + * SimpleChannelInboundHandler} belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ + @Override + public void channelRead0(ChannelHandlerContext ctx, + ContainerProtos.ContainerCommandResponseProto msg) + throws Exception { + responses.add(msg); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + LOG.debug("channelRegistered: Connected to ctx"); + channel = ctx.channel(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.info("Exception in client " + cause.toString()); + ctx.close(); + } + + /** + * Since netty is async, we send a work request and then wait until a response + * appears in the reply queue. This is simple sync interface for clients. we + * should consider building async interfaces for client if this turns out to + * be a performance bottleneck. + * + * @param request - request. + * @return -- response + */ + public ContainerProtos.ContainerCommandResponseProto + sendCommand(ContainerProtos.ContainerCommandRequestProto request) { + + ContainerProtos.ContainerCommandResponseProto response; + channel.writeAndFlush(request); + boolean interrupted = false; + for (; ; ) { + try { + response = responses.take(); + break; + } catch (InterruptedException ignore) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + return response; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java new file mode 100644 index 0000000..cbf8ee9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.client; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; + +/** + * Setup the netty pipeline. + */ +public class XceiverClientInitializer extends + ChannelInitializer<SocketChannel> { + private final Pipeline pipeline; + + /** + * Constructs an Initializer for the client pipeline. + * @param pipeline - Pipeline. + */ + public XceiverClientInitializer(Pipeline pipeline) { + this.pipeline = pipeline; + } + + /** + * This method will be called once when the Channel is registered. After + * the method returns this instance will be removed from the + * ChannelPipeline of the Channel. + * + * @param ch Channel which was registered. + * @throws Exception is thrown if an error occurs. In that case the + * Channel will be closed. + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + + p.addLast(new ProtobufVarint32FrameDecoder()); + p.addLast(new ProtobufDecoder(ContainerProtos + .ContainerCommandResponseProto.getDefaultInstance())); + + p.addLast(new ProtobufVarint32LengthFieldPrepender()); + p.addLast(new ProtobufEncoder()); + + p.addLast(new XceiverClientHandler(this.pipeline)); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java new file mode 100644 index 0000000..77e4af1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; + +/** + * Creates a netty server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServer { + private final int port; + private final ContainerDispatcher storageContainer; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel channel; + + /** + * Constructs a netty server class. + * + * @param conf - Configuration + */ + public XceiverServer(OzoneConfiguration conf, + ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(conf); + this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT); + this.storageContainer = dispatcher; + } + + /** + * Starts running the server. + * + * @throws Exception + */ + public void start() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + channel = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new XceiverServerInitializer(storageContainer)) + .bind(port) + .syncUninterruptibly() + .channel(); + } + + /** + * Stops a running server. + * + * @throws Exception + */ + public void stop() throws Exception { + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (channel != null) { + channel.close().awaitUninterruptibly(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java new file mode 100644 index 0000000..c4a8f53 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; + +/** + * Netty server handlers that respond to Network events. + */ +public class XceiverServerHandler extends + SimpleChannelInboundHandler<ContainerCommandRequestProto> { + + static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class); + private final ContainerDispatcher dispatcher; + + /** + * Constructor for server handler. + * @param dispatcher - Dispatcher interface + */ + public XceiverServerHandler(ContainerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + /** + * <strong>Please keep in mind that this method will be renamed to {@code + * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong> + * <p> + * Is called for each message of type {@link ContainerCommandRequestProto}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link + * SimpleChannelInboundHandler} belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ + @Override + public void channelRead0(ChannelHandlerContext ctx, + ContainerCommandRequestProto msg) throws + Exception { + ContainerCommandResponseProto response = this.dispatcher.dispatch(msg); + LOG.debug("Writing the reponse back to client."); + ctx.writeAndFlush(response); + + } + + /** + * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} + * Sub-classes may override this method to change behavior. + * + * @param ctx - Channel Handler Context + * @param cause - Exception + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + LOG.error("An exception caught in the pipeline : " + cause.toString()); + super.exceptionCaught(ctx, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java new file mode 100644 index 0000000..4d32d86 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.base.Preconditions; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; + +/** + * Creates a channel for the XceiverServer. + */ +public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{ + private final ContainerDispatcher dispatcher; + public XceiverServerInitializer(ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(dispatcher); + this.dispatcher = dispatcher; + } + + /** + * This method will be called once the Channel is registered. After + * the method returns this instance will be removed from the {@link + * ChannelPipeline} + * + * @param ch the which was registered. + * @throws Exception is thrown if an error occurs. In that case the channel + * will be closed. + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ProtobufVarint32FrameDecoder()); + pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto + .getDefaultInstance())); + pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast(new ProtobufEncoder()); + pipeline.addLast(new XceiverServerHandler(dispatcher)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java deleted file mode 100644 index dd2d173..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.helpers; - -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.TreeMap; - -/** - * This class maintains the information about a container in the ozone world. - * <p> - * A container is a name, along with metadata- which is a set of key value - * pair. - */ -public class ContainerData { - - private final String containerName; - private final Map<String, String> metadata; - - private String path; - - /** - * Constructs a ContainerData Object. - * - * @param containerName - Name - */ - public ContainerData(String containerName) { - this.metadata = new TreeMap<>(); - this.containerName = containerName; - } - - /** - * Constructs a ContainerData object from ProtoBuf classes. - * - * @param protoData - ProtoBuf Message - * @throws IOException - */ - public static ContainerData getFromProtBuf( - ContainerProtos.ContainerData protoData) throws IOException { - ContainerData data = new ContainerData(protoData.getName()); - for (int x = 0; x < protoData.getMetadataCount(); x++) { - data.addMetadata(protoData.getMetadata(x).getKey(), - protoData.getMetadata(x).getValue()); - } - - if (protoData.hasContainerPath()) { - data.setPath(protoData.getContainerPath()); - } - return data; - } - - /** - * Returns a ProtoBuf Message from ContainerData. - * - * @return Protocol Buffer Message - */ - public ContainerProtos.ContainerData getProtoBufMessage() { - ContainerProtos.ContainerData.Builder builder = ContainerProtos - .ContainerData.newBuilder(); - builder.setName(this.getContainerName()); - if (this.getPath() != null) { - builder.setContainerPath(this.getPath()); - } - for (Map.Entry<String, String> entry : metadata.entrySet()) { - ContainerProtos.KeyValue.Builder keyValBuilder = - ContainerProtos.KeyValue.newBuilder(); - builder.addMetadata(keyValBuilder.setKey(entry.getKey()) - .setValue(entry.getValue()).build()); - } - return builder.build(); - } - - /** - * Returns the name of the container. - * - * @return - name - */ - public String getContainerName() { - return containerName; - } - - /** - * Adds metadata. - */ - public void addMetadata(String key, String value) throws IOException { - synchronized (this.metadata) { - if (this.metadata.containsKey(key)) { - throw new IOException("This key already exists. Key " + key); - } - metadata.put(key, value); - } - } - - /** - * Returns all metadata. - */ - public Map<String, String> getAllMetadata() { - synchronized (this.metadata) { - return Collections.unmodifiableMap(this.metadata); - } - } - - /** - * Returns value of a key. - */ - public String getValue(String key) { - synchronized (this.metadata) { - return metadata.get(key); - } - } - - /** - * Deletes a metadata entry from the map. - * - * @param key - Key - */ - public void deleteKey(String key) { - synchronized (this.metadata) { - metadata.remove(key); - } - } - - /** - * Returns path. - * - * @return - path - */ - public String getPath() { - return path; - } - - /** - * Sets path. - * - * @param path - String. - */ - public void setPath(String path) { - this.path = path; - } - - /** - * This function serves as the generic key for OzoneCache class. Both - * ContainerData and ContainerKeyData overrides this function to appropriately - * return the right name that can be used in OzoneCache. - * - * @return String Name. - */ - public String getName() { - return getContainerName(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java deleted file mode 100644 index 6aef443..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.helpers; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; - -/** - * A set of helper functions to create proper responses. - */ -public final class ContainerUtils { - - /** - * Returns a CreateContainer Response. This call is used by create and delete - * containers which have null success responses. - * - * @param msg Request - * @return Response. - */ - public static ContainerProtos.ContainerCommandResponseProto - getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) { - ContainerProtos.ContainerCommandResponseProto.Builder builder = - getContainerResponse(msg, ContainerProtos.Result.SUCCESS, ""); - return builder.build(); - } - - /** - * Returns a ReadContainer Response. - * - * @param msg Request - * @return Response. - */ - public static ContainerProtos.ContainerCommandResponseProto - getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg, - ContainerData containerData) { - Preconditions.checkNotNull(containerData); - - ContainerProtos.ReadContainerResponseProto.Builder response = - ContainerProtos.ReadContainerResponseProto.newBuilder(); - response.setContainerData(containerData.getProtoBufMessage()); - - ContainerProtos.ContainerCommandResponseProto.Builder builder = - getContainerResponse(msg, ContainerProtos.Result.SUCCESS, ""); - builder.setReadContainer(response); - return builder.build(); - } - - /** - * We found a command type but no associated payload for the command. Hence - * return malformed Command as response. - * - * @param msg - Protobuf message. - * @return ContainerCommandResponseProto - MALFORMED_REQUEST. - */ - public static ContainerProtos.ContainerCommandResponseProto.Builder - getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg, - ContainerProtos.Result result, String message) { - return - ContainerProtos.ContainerCommandResponseProto.newBuilder() - .setCmdType(msg.getCmdType()) - .setTraceID(msg.getTraceID()) - .setResult(result) - .setMessage(message); - } - - /** - * We found a command type but no associated payload for the command. Hence - * return malformed Command as response. - * - * @param msg - Protobuf message. - * @return ContainerCommandResponseProto - MALFORMED_REQUEST. - */ - public static ContainerProtos.ContainerCommandResponseProto - malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) { - return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST, - "Cmd type does not match the payload.").build(); - } - - /** - * We found a command type that is not supported yet. - * - * @param msg - Protobuf message. - * @return ContainerCommandResponseProto - MALFORMED_REQUEST. - */ - public static ContainerProtos.ContainerCommandResponseProto - unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) { - return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST, - "Server does not support this command yet.").build(); - } - - private ContainerUtils() { - //never constructed. - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java deleted file mode 100644 index d1bcc8d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.helpers; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -/** - * A pipeline represents the group of machines over which a container lives. - */ -public class Pipeline { - private String containerName; - private String leaderID; - private Map<String, DatanodeID> datanodes; - - /** - * Constructs a new pipeline data structure. - * - * @param leaderID - First machine in this pipeline. - */ - public Pipeline(String leaderID) { - this.leaderID = leaderID; - datanodes = new TreeMap<>(); - } - - /** - * Gets pipeline object from protobuf. - * - * @param pipeline - ProtoBuf definition for the pipeline. - * @return Pipeline Object - */ - public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) { - Preconditions.checkNotNull(pipeline); - Pipeline newPipeline = new Pipeline(pipeline.getLeaderID()); - for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) { - newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID)); - } - if (pipeline.hasContainerName()) { - newPipeline.containerName = newPipeline.getContainerName(); - } - return newPipeline; - } - - /** Adds a member to pipeline */ - - /** - * Adds a member to the pipeline. - * - * @param dataNodeId - Datanode to be added. - */ - public void addMember(DatanodeID dataNodeId) { - datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId); - } - - /** - * Returns the first machine in the set of datanodes. - * - * @return First Machine. - */ - public DatanodeID getLeader() { - return datanodes.get(leaderID); - } - - /** - * Returns all machines that make up this pipeline. - * - * @return List of Machines. - */ - public List<DatanodeID> getMachines() { - return new ArrayList<>(datanodes.values()); - } - - /** - * Return a Protobuf Pipeline message from pipeline. - * - * @return Protobuf message - */ - public ContainerProtos.Pipeline getProtobufMessage() { - ContainerProtos.Pipeline.Builder builder = - ContainerProtos.Pipeline.newBuilder(); - for (DatanodeID datanode : datanodes.values()) { - builder.addMembers(datanode.getProtoBufMessage()); - } - builder.setLeaderID(leaderID); - if (this.containerName != null) { - builder.setContainerName(this.containerName); - } - return builder.build(); - } - - /** - * Returns containerName if available. - * - * @return String. - */ - public String getContainerName() { - return containerName; - } - - /** - * Sets the container Name. - * - * @param containerName - Name of the container. - */ - public void setContainerName(String containerName) { - this.containerName = containerName; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java deleted file mode 100644 index 15a4a28..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.helpers; -/** - Contains protocol buffer helper classes. - **/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java deleted file mode 100644 index f587b2a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.interfaces; - -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; - -import java.io.IOException; - -/** - * Dispatcher acts as the bridge between the transport layer and - * the actual container layer. This layer is capable of transforming - * protobuf objects into corresponding class and issue the function call - * into the lower layers. - * - * The reply from the request is dispatched to the client. - */ -public interface ContainerDispatcher { - /** - * Dispatches commands to container layer. - * @param msg - Command Request - * @return Command Response - * @throws IOException - */ - ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg) - throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java deleted file mode 100644 index f98544d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.interfaces; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.ozone.container.helpers.ContainerData; -import org.apache.hadoop.ozone.container.helpers.Pipeline; - -import java.io.IOException; -import java.util.List; - -/** - * Interface for container operations. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public interface ContainerManager { - - /** - * Creates a container with the given name. - * - * @param pipeline -- Nodes which make up this container. - * @param containerData - Container Name and metadata. - * @throws IOException - */ - void createContainer(Pipeline pipeline, ContainerData containerData) - throws IOException; - - /** - * Deletes an existing container. - * - * @param pipeline - nodes that make this container. - * @param containerName - name of the container. - * @throws IOException - */ - void deleteContainer(Pipeline pipeline, String containerName) - throws IOException; - - /** - * As simple interface for container Iterations. - * - * @param start - Starting index - * @param count - how many to return - * @param data - Actual containerData - * @throws IOException - */ - void listContainer(long start, long count, List<ContainerData> data) - throws IOException; - - /** - * Get metadata about a specific container. - * - * @param containerName - Name of the container - * @return ContainerData - * @throws IOException - */ - ContainerData readContainer(String containerName) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java deleted file mode 100644 index 92aa241..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.ozoneimpl; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; -import org.apache.hadoop.ozone.container.helpers.ContainerData; -import org.apache.hadoop.ozone.container.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.helpers.Pipeline; -import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.interfaces.ContainerManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Ozone Container dispatcher takes a call from the netty server and routes it - * to the right handler function. - */ -public class Dispatcher implements ContainerDispatcher { - static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); - - private final ContainerManager containerManager; - - /** - * Constructs an OzoneContainer that receives calls from - * XceiverServerHandler. - * - * @param containerManager - A class that manages containers. - */ - public Dispatcher(ContainerManager containerManager) { - Preconditions.checkNotNull(containerManager); - this.containerManager = containerManager; - } - - @Override - public ContainerCommandResponseProto dispatch( - ContainerCommandRequestProto msg) throws IOException { - Preconditions.checkNotNull(msg); - Type cmdType = msg.getCmdType(); - if ((cmdType == Type.CreateContainer) || - (cmdType == Type.DeleteContainer) || - (cmdType == Type.ReadContainer) || - (cmdType == Type.ListContainer)) { - - return containerProcessHandler(msg); - } - - - return ContainerUtils.unsupportedRequest(msg); - } - - /** - * Handles the all Container related functionality. - * - * @param msg - command - * @return - response - * @throws IOException - */ - private ContainerCommandResponseProto containerProcessHandler( - ContainerCommandRequestProto msg) throws IOException { - try { - ContainerData cData = ContainerData.getFromProtBuf( - msg.getCreateContainer().getContainerData()); - - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getCreateContainer().getPipeline()); - Preconditions.checkNotNull(pipeline); - - switch (msg.getCmdType()) { - case CreateContainer: - return handleCreateContainer(msg, cData, pipeline); - - case DeleteContainer: - return handleDeleteContainer(msg, cData, pipeline); - - case ListContainer: - return ContainerUtils.unsupportedRequest(msg); - - case ReadContainer: - return handleReadContainer(msg, cData); - - default: - return ContainerUtils.unsupportedRequest(msg); - } - } catch (IOException ex) { - LOG.warn("Container operation failed. " + - "Container: {} Operation: {} trace ID: {} Error: {}", - msg.getCreateContainer().getContainerData().getName(), - msg.getCmdType().name(), - msg.getTraceID(), - ex.toString()); - - // TODO : Replace with finer error codes. - return ContainerUtils.getContainerResponse(msg, - ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, - ex.toString()).build(); - } - } - - /** - * Calls into container logic and returns appropriate response. - * - * @param msg - Request - * @param cData - Container Data object - * @return ContainerCommandResponseProto - * @throws IOException - */ - private ContainerCommandResponseProto handleReadContainer( - ContainerCommandRequestProto msg, ContainerData cData) - throws IOException { - - if (!msg.hasReadContainer()) { - LOG.debug("Malformed read container request. trace ID: {}", - msg.getTraceID()); - return ContainerUtils.malformedRequest(msg); - } - ContainerData container = this.containerManager.readContainer( - cData.getContainerName()); - return ContainerUtils.getReadContainerResponse(msg, container); - } - - /** - * Calls into container logic and returns appropriate response. - * - * @param msg - Request - * @param cData - ContainerData - * @param pipeline - Pipeline is the machines where this container lives. - * @return Response. - * @throws IOException - */ - private ContainerCommandResponseProto handleDeleteContainer( - ContainerCommandRequestProto msg, ContainerData cData, - Pipeline pipeline) throws IOException { - if (!msg.hasDeleteContainer()) { - LOG.debug("Malformed delete container request. trace ID: {}", - msg.getTraceID()); - return ContainerUtils.malformedRequest(msg); - } - this.containerManager.deleteContainer(pipeline, - cData.getContainerName()); - return ContainerUtils.getContainerResponse(msg); - } - - /** - * Calls into container logic and returns appropriate response. - * - * @param msg - Request - * @param cData - ContainerData - * @param pipeline - Pipeline is the machines where this container lives. - * @return Response. - * @throws IOException - */ - private ContainerCommandResponseProto handleCreateContainer( - ContainerCommandRequestProto msg, ContainerData cData, - Pipeline pipeline) throws IOException { - if (!msg.hasCreateContainer()) { - LOG.debug("Malformed create container request. trace ID: {}", - msg.getTraceID()); - return ContainerUtils.malformedRequest(msg); - } - this.containerManager.createContainer(pipeline, cData); - return ContainerUtils.getContainerResponse(msg); - } -}