http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolServerSideTranslatorPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolServerSideTranslatorPB.java deleted file mode 100644 index 8924a0c..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolServerSideTranslatorPB.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.protocolPB; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; -import org.apache.hadoop.classification.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT; - -/** - * Server side implementation of the protobuf service. - */ -@InterfaceAudience.Private -public class CBlockServiceProtocolServerSideTranslatorPB - implements CBlockServiceProtocolPB { - - private final CBlockServiceProtocol impl; - private static final Logger LOG = - LoggerFactory.getLogger( - CBlockServiceProtocolServerSideTranslatorPB.class); - - @Override - public CBlockServiceProtocolProtos.CreateVolumeResponseProto createVolume( - RpcController controller, - CBlockServiceProtocolProtos.CreateVolumeRequestProto request) - throws ServiceException { - if (LOG.isDebugEnabled()) { - LOG.debug("createVolume called! volume size: " + request.getVolumeSize() - + " block size: " + request.getBlockSize()); - } - try { - if (request.hasBlockSize()) { - impl.createVolume(request.getUserName(), request.getVolumeName(), - request.getVolumeSize(), request.getBlockSize()); - } else{ - impl.createVolume(request.getUserName(), request.getVolumeName(), - request.getVolumeSize(), DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT); - } - } catch (IOException e) { - throw new ServiceException(e); - } - return CBlockServiceProtocolProtos.CreateVolumeResponseProto - .newBuilder().build(); - } - - @Override - public CBlockServiceProtocolProtos.DeleteVolumeResponseProto deleteVolume( - RpcController controller, - CBlockServiceProtocolProtos.DeleteVolumeRequestProto request) - throws ServiceException { - if (LOG.isDebugEnabled()) { - LOG.debug("deleteVolume called! volume name: " + request.getVolumeName() - + " force:" + request.getForce()); - } - try { - if (request.hasForce()) { - impl.deleteVolume(request.getUserName(), request.getVolumeName(), - request.getForce()); - } else { - impl.deleteVolume(request.getUserName(), request.getVolumeName(), - false); - } - } catch (IOException e) { - throw new ServiceException(e); - } - return CBlockServiceProtocolProtos.DeleteVolumeResponseProto - .newBuilder().build(); - } - - @Override - public CBlockServiceProtocolProtos.InfoVolumeResponseProto infoVolume( - RpcController controller, - CBlockServiceProtocolProtos.InfoVolumeRequestProto request - ) throws ServiceException { - if (LOG.isDebugEnabled()) { - LOG.debug("infoVolume called! volume name: " + request.getVolumeName()); - } - CBlockServiceProtocolProtos.InfoVolumeResponseProto.Builder resp = - CBlockServiceProtocolProtos.InfoVolumeResponseProto.newBuilder(); - CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeInfoProto = - CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder(); - VolumeInfo volumeInfo; - try { - volumeInfo = impl.infoVolume(request.getUserName(), - request.getVolumeName()); - } catch (IOException e) { - throw new ServiceException(e); - } - - volumeInfoProto.setVolumeSize(volumeInfo.getVolumeSize()); - volumeInfoProto.setBlockSize(volumeInfo.getBlockSize()); - volumeInfoProto.setUsage(volumeInfo.getUsage()); - volumeInfoProto.setUserName(volumeInfo.getUserName()); - volumeInfoProto.setVolumeName(volumeInfo.getVolumeName()); - resp.setVolumeInfo(volumeInfoProto); - return resp.build(); - } - - @Override - public CBlockServiceProtocolProtos.ListVolumeResponseProto listVolume( - RpcController controller, - CBlockServiceProtocolProtos.ListVolumeRequestProto request - ) throws ServiceException { - CBlockServiceProtocolProtos.ListVolumeResponseProto.Builder resp = - CBlockServiceProtocolProtos.ListVolumeResponseProto.newBuilder(); - String userName = null; - if (request.hasUserName()) { - userName = request.getUserName(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("list volume received for :" + userName); - } - List<VolumeInfo> volumes; - try { - volumes = impl.listVolume(userName); - } catch (IOException e) { - throw new ServiceException(e); - } - for (VolumeInfo volume : volumes) { - CBlockServiceProtocolProtos.VolumeInfoProto.Builder volumeEntryProto - = CBlockServiceProtocolProtos.VolumeInfoProto.newBuilder(); - volumeEntryProto.setUserName(volume.getUserName()); - volumeEntryProto.setVolumeName(volume.getVolumeName()); - volumeEntryProto.setVolumeSize(volume.getVolumeSize()); - volumeEntryProto.setBlockSize(volume.getBlockSize()); - resp.addVolumeEntry(volumeEntryProto.build()); - } - return resp.build(); - } - - public CBlockServiceProtocolServerSideTranslatorPB( - CBlockServiceProtocol impl) { - this.impl = impl; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java deleted file mode 100644 index 5e03a92..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.protocolPB; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java deleted file mode 100644 index c6c6a78..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.storage; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.cblock.CBlockConfigKeys; -import org.apache.hadoop.cblock.exception.CBlockException; -import org.apache.hadoop.cblock.meta.ContainerDescriptor; -import org.apache.hadoop.cblock.meta.VolumeDescriptor; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.util.KeyUtil; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.client.ScmClient; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * This class maintains the key space of CBlock, more specifically, the - * volume to container mapping. The core data structure - * is a map from users to their volumes info, where volume info is a handler - * to a volume, containing information for IO on that volume and a storage - * client responsible for talking to the SCM. - */ -public class StorageManager { - private static final Logger LOGGER = - LoggerFactory.getLogger(StorageManager.class); - private final ScmClient storageClient; - private final int numThreads; - private static final int MAX_THREADS = - Runtime.getRuntime().availableProcessors() * 2; - private static final int MAX_QUEUE_CAPACITY = 1024; - private final String cblockId; - - /** - * We will NOT have the situation where same kv pair getting - * processed, but it is possible to have multiple kv pair being - * processed at same time. - * - * So using just ConcurrentHashMap should be sufficient - * - * Again since currently same user accessing from multiple places - * is not allowed, no need to consider concurrency of volume map - * within one user - */ - private ConcurrentHashMap<String, HashMap<String, VolumeDescriptor>> - user2VolumeMap; - // size of an underlying container. - // TODO : assuming all containers are of the same size - private long containerSizeB; - - public StorageManager(ScmClient storageClient, - OzoneConfiguration ozoneConfig, String cblockId) throws IOException { - this.storageClient = storageClient; - this.user2VolumeMap = new ConcurrentHashMap<>(); - this.containerSizeB = storageClient.getContainerSize(null); - this.numThreads = - ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE, - CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT); - this.cblockId = cblockId; - } - - /** - * This call will put the volume into in-memory map. - * - * more specifically, make the volume discoverable on jSCSI server - * and keep it's reference in-memory for look up. - * @param userName the user name of the volume. - * @param volumeName the name of the volume, - * @param volume a {@link VolumeDescriptor} object encapsulating the - * information about the volume. - */ - private void makeVolumeReady(String userName, String volumeName, - VolumeDescriptor volume) { - HashMap<String, VolumeDescriptor> userVolumes; - if (user2VolumeMap.containsKey(userName)) { - userVolumes = user2VolumeMap.get(userName); - } else { - userVolumes = new HashMap<>(); - user2VolumeMap.put(userName, userVolumes); - } - userVolumes.put(volumeName, volume); - } - - /** - * Called by CBlockManager to add volumes read from persistent store into - * memory, need to contact SCM to setup the reference to the containers given - * their id. - * - * Only for failover process where container meta info is read from - * persistent store, and containers themselves are alive. - * - * TODO : Currently, this method is not being called as failover process - * is not implemented yet. - * - * @param volumeDescriptor a {@link VolumeDescriptor} object encapsulating - * the information about a volume. - * @throws IOException when adding the volume failed. e.g. volume already - * exist, or no more container available. - */ - public synchronized void addVolume(VolumeDescriptor volumeDescriptor) - throws IOException{ - String userName = volumeDescriptor.getUserName(); - String volumeName = volumeDescriptor.getVolumeName(); - LOGGER.info("addVolume:" + userName + ":" + volumeName); - if (user2VolumeMap.containsKey(userName) - && user2VolumeMap.get(userName).containsKey(volumeName)) { - throw new CBlockException("Volume already exist for " - + userName + ":" + volumeName); - } - // the container ids are read from levelDB, setting up the - // container handlers here. - String[] containerIds = volumeDescriptor.getContainerIDs(); - - for (String containerId : containerIds) { - try { - Pipeline pipeline = storageClient.getContainer(containerId); - ContainerDescriptor containerDescriptor = - new ContainerDescriptor(containerId); - containerDescriptor.setPipeline(pipeline); - volumeDescriptor.addContainer(containerDescriptor); - } catch (IOException e) { - LOGGER.error("Getting container failed! Container:{} error:{}", - containerId, e); - throw e; - } - } - // now ready to put into in-memory map. - makeVolumeReady(userName, volumeName, volumeDescriptor); - } - - private class CreateContainerTask implements Runnable { - private final VolumeDescriptor volume; - private final int containerIdx; - private final ArrayList<String> containerIds; - private final AtomicInteger numFailed; - - CreateContainerTask(VolumeDescriptor volume, int containerIdx, - ArrayList<String> containerIds, - AtomicInteger numFailed) { - this.volume = volume; - this.containerIdx = containerIdx; - this.containerIds = containerIds; - this.numFailed = numFailed; - } - - /** - * When an object implementing interface <code>Runnable</code> is used - * to create a thread, starting the thread causes the object's - * <code>run</code> method to be called in that separately executing - * thread. - * <p> - * The general contract of the method <code>run</code> is that it may - * take any action whatsoever. - * - * @see Thread#run() - */ - public void run() { - ContainerDescriptor container = null; - try { - Pipeline pipeline = storageClient.createContainer( - HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.ONE, - KeyUtil.getContainerName(volume.getUserName(), - volume.getVolumeName(), containerIdx), cblockId); - - container = new ContainerDescriptor(pipeline.getContainerName()); - - container.setPipeline(pipeline); - container.setContainerIndex(containerIdx); - volume.addContainer(container); - containerIds.set(containerIdx, container.getContainerID()); - } catch (Exception e) { - numFailed.incrementAndGet(); - if (container != null) { - LOGGER.error("Error creating container Container:{}:" + - " index:{} error:{}", container.getContainerID(), - containerIdx, e); - } else { - LOGGER.error("Error creating container.", e); - } - } - } - } - - private boolean createVolumeContainers(VolumeDescriptor volume) { - ArrayList<String> containerIds = new ArrayList<>(); - ThreadPoolExecutor executor = new ThreadPoolExecutor( - Math.min(numThreads, MAX_THREADS), - MAX_THREADS, 1, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY), - new ThreadPoolExecutor.CallerRunsPolicy()); - - AtomicInteger numFailedCreates = new AtomicInteger(0); - long allocatedSize = 0; - int containerIdx = 0; - while (allocatedSize < volume.getVolumeSize()) { - // adding null to allocate space in ArrayList - containerIds.add(containerIdx, null); - Runnable task = new CreateContainerTask(volume, containerIdx, - containerIds, numFailedCreates); - executor.submit(task); - allocatedSize += containerSizeB; - containerIdx += 1; - } - - // issue the command and then wait for it to finish - executor.shutdown(); - try { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOGGER.error("Error creating volume:{} error:{}", - volume.getVolumeName(), e); - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - - volume.setContainerIDs(containerIds); - return numFailedCreates.get() == 0; - } - - private void deleteContainer(String containerID, boolean force) { - try { - Pipeline pipeline = storageClient.getContainer(containerID); - storageClient.deleteContainer(pipeline, force); - } catch (Exception e) { - LOGGER.error("Error deleting container Container:{} error:{}", - containerID, e); - } - } - - private void deleteVolumeContainers(List<String> containers, boolean force) - throws CBlockException { - ThreadPoolExecutor executor = new ThreadPoolExecutor( - Math.min(numThreads, MAX_THREADS), - MAX_THREADS, 1, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY), - new ThreadPoolExecutor.CallerRunsPolicy()); - - for (String deleteContainer : containers) { - if (deleteContainer != null) { - Runnable task = () -> deleteContainer(deleteContainer, force); - executor.submit(task); - } - } - - // issue the command and then wait for it to finish - executor.shutdown(); - try { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOGGER.error("Error deleting containers error:{}", e); - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - - /** - * Called by CBlock server when creating a fresh volume. The core - * logic is adding needed information into in-memory meta data. - * - * @param userName the user name of the volume. - * @param volumeName the name of the volume. - * @param volumeSize the size of the volume. - * @param blockSize the block size of the volume. - * @throws CBlockException when the volume can not be created. - */ - public synchronized void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws CBlockException { - LOGGER.debug("createVolume:" + userName + ":" + volumeName); - if (user2VolumeMap.containsKey(userName) - && user2VolumeMap.get(userName).containsKey(volumeName)) { - throw new CBlockException("Volume already exist for " - + userName + ":" + volumeName); - } - if (volumeSize < blockSize) { - throw new CBlockException("Volume size smaller than block size? " + - "volume size:" + volumeSize + " block size:" + blockSize); - } - VolumeDescriptor volume - = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize); - boolean success = createVolumeContainers(volume); - if (!success) { - // cleanup the containers and throw the exception - deleteVolumeContainers(volume.getContainerIDsList(), true); - throw new CBlockException("Error when creating volume:" + volumeName); - } - makeVolumeReady(userName, volumeName, volume); - } - - /** - * Called by CBlock server to delete a specific volume. Mainly - * to check whether it can be deleted, and remove it from in-memory meta - * data. - * - * @param userName the user name of the volume. - * @param volumeName the name of the volume. - * @param force if set to false, only delete volume it is empty, otherwise - * throw exception. if set to true, delete regardless. - * @throws CBlockException when the volume can not be deleted. - */ - public synchronized void deleteVolume(String userName, String volumeName, - boolean force) throws CBlockException { - if (!user2VolumeMap.containsKey(userName) - || !user2VolumeMap.get(userName).containsKey(volumeName)) { - throw new CBlockException("Deleting non-exist volume " - + userName + ":" + volumeName); - } - if (!force && !user2VolumeMap.get(userName).get(volumeName).isEmpty()) { - throw new CBlockException("Deleting a non-empty volume without force!"); - } - VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName); - deleteVolumeContainers(volume.getContainerIDsList(), force); - if (user2VolumeMap.get(userName).size() == 0) { - user2VolumeMap.remove(userName); - } - } - - /** - * Called by CBlock server to get information of a specific volume. - * - * @param userName the user name of the volume. - * @param volumeName the name of the volume. - * @return a {@link VolumeInfo} object encapsulating the information of the - * volume. - * @throws CBlockException when the information can not be retrieved. - */ - public synchronized VolumeInfo infoVolume(String userName, String volumeName) - throws CBlockException { - if (!user2VolumeMap.containsKey(userName) - || !user2VolumeMap.get(userName).containsKey(volumeName)) { - throw new CBlockException("Getting info for non-exist volume " - + userName + ":" + volumeName); - } - return user2VolumeMap.get(userName).get(volumeName).getInfo(); - } - - /** - * Called by CBlock server to check whether the given volume can be - * mounted, i.e. whether it can be found in the meta data. - * - * return a {@link MountVolumeResponse} with isValid flag to indicate - * whether the volume can be mounted or not. - * - * @param userName the user name of the volume. - * @param volumeName the name of the volume - * @return a {@link MountVolumeResponse} object encapsulating whether the - * volume is valid, and if yes, the requried information for client to - * read/write the volume. - */ - public synchronized MountVolumeResponse isVolumeValid( - String userName, String volumeName) { - if (!user2VolumeMap.containsKey(userName) - || !user2VolumeMap.get(userName).containsKey(volumeName)) { - // in the case of invalid volume, no need to set any value other than - // isValid flag. - return new MountVolumeResponse(false, null, null, 0, 0, null, null); - } - VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName); - return new MountVolumeResponse(true, userName, - volumeName, volume.getVolumeSize(), volume.getBlockSize(), - volume.getContainerPipelines(), volume.getPipelines()); - } - - /** - * Called by CBlock manager to list all volumes. - * - * @param userName the userName whose volume to be listed, if set to null, - * all volumes will be listed. - * @return a list of {@link VolumeDescriptor} representing all volumes - * requested. - */ - public synchronized List<VolumeDescriptor> getAllVolume(String userName) { - ArrayList<VolumeDescriptor> allVolumes = new ArrayList<>(); - if (userName == null) { - for (Map.Entry<String, HashMap<String, VolumeDescriptor>> entry - : user2VolumeMap.entrySet()) { - allVolumes.addAll(entry.getValue().values()); - } - } else { - if (user2VolumeMap.containsKey(userName)) { - allVolumes.addAll(user2VolumeMap.get(userName).values()); - } - } - return allVolumes; - } - - /** - * Only for testing the behavior of create/delete volumes. - */ - @VisibleForTesting - public VolumeDescriptor getVolume(String userName, String volumeName) { - if (!user2VolumeMap.containsKey(userName) - || !user2VolumeMap.get(userName).containsKey(volumeName)) { - return null; - } - return user2VolumeMap.get(userName).get(volumeName); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java deleted file mode 100644 index 4426e6d..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.storage; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java deleted file mode 100644 index beb9e32..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.util; - -/** - * A simply class that generates key mappings. (e.g. from (userName, volumeName) - * pair to a single string volumeKey. - */ -public final class KeyUtil { - public static String getVolumeKey(String userName, String volumeName) { - return userName + ":" + volumeName; - } - - public static String getContainerName(String userName, String volumeName, - int containerID) { - return getVolumeKey(userName, volumeName) + "#" + containerID; - } - - public static String getUserNameFromVolumeKey(String key) { - return key.split(":")[0]; - } - - public static String getVolumeFromVolumeKey(String key) { - return key.split(":")[1]; - } - - public static boolean isValidVolumeKey(String key) { - return key.contains(":"); - } - - private KeyUtil() { - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java deleted file mode 100644 index 5b9aa0c..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.util; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto b/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto deleted file mode 100644 index 45d0de9..0000000 --- a/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * These .proto interfaces are private and unstable. - * Please see http://wiki.apache.org/hadoop/Compatibility - * for what changes are allowed for a *unstable* .proto interface. - */ -option java_package = "org.apache.hadoop.cblock.protocol.proto"; -option java_outer_classname = "CBlockClientServerProtocolProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -package hadoop.cblock; - -import "hdds.proto"; -import "CBlockServiceProtocol.proto"; -/** -* This message is sent from CBlock client side to CBlock server to -* mount a volume specified by owner name and volume name. -* -* Right now, this is the only communication between client and server. -* After the volume is mounted, CBlock client will talk to containers -* by itself, nothing to do with CBlock server. -*/ -message MountVolumeRequestProto { - required string userName = 1; - required string volumeName = 2; -} - -/** -* This message is sent from CBlock server to CBlock client as response -* of mount a volume. It checks the whether the volume is valid to access -* at all.(e.g. volume exist) -* -* And include enough information (volume size, block size, list of -* containers for this volume) for client side to perform read/write on -* the volume. -*/ -message MountVolumeResponseProto { - required bool isValid = 1; - optional string userName = 2; - optional string volumeName = 3; - optional uint64 volumeSize = 4; - optional uint32 blockSize = 5; - repeated ContainerIDProto allContainerIDs = 6; -} - -/** -* This message include ID of container which can be used to locate the -* container. Since the order of containers needs to be maintained, also -* includes a index field to verify the correctness of the order. -*/ -message ContainerIDProto { - required string containerID = 1; - required uint64 index = 2; - // making pipeline optional to be compatible with exisiting tests - optional hadoop.hdds.Pipeline pipeline = 3; -} - - -message ListVolumesRequestProto { - -} - -message ListVolumesResponseProto { - repeated VolumeInfoProto volumeEntry = 1; -} - - -service CBlockClientServerProtocolService { - /** - * mount the volume. - */ - rpc mountVolume(MountVolumeRequestProto) returns (MountVolumeResponseProto); - - rpc listVolumes(ListVolumesRequestProto) returns(ListVolumesResponseProto); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto b/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto deleted file mode 100644 index 36e4b59..0000000 --- a/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * These .proto interfaces are private and unstable. - * Please see http://wiki.apache.org/hadoop/Compatibility - * for what changes are allowed for a *unstable* .proto interface. - */ - -option java_package = "org.apache.hadoop.cblock.protocol.proto"; -option java_outer_classname = "CBlockServiceProtocolProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -package hadoop.cblock; - -/** -* This message is sent to CBlock server to create a volume. Creating -* volume requries four parameters: owner of the volume, name of the volume -* size of volume and block size of the volume. -*/ -message CreateVolumeRequestProto { - required string userName = 1; - required string volumeName = 2; - required uint64 volumeSize = 3; - optional uint32 blockSize = 4 [default = 4096]; -} - -/** -* Empty response message. -*/ -message CreateVolumeResponseProto { - -} - -/** -* This message is sent to CBlock server to delete a volume. The volume -* is specified by owner name and volume name. If force is set to -* false, volume will be deleted only if it is empty. Otherwise delete it -* regardless. -*/ -message DeleteVolumeRequestProto { - required string userName = 1; - required string volumeName = 2; - optional bool force = 3; -} - -/** -* Empty response message. -*/ -message DeleteVolumeResponseProto { - -} - -/** -* This message is sent to CBlock server to request info of a volume. The -* volume is specified by owner name and volume name. -*/ -message InfoVolumeRequestProto { - required string userName = 1; - required string volumeName = 2; -} - -/** -* This message describes the information of a volume. -* Currently, the info includes the volume creation parameters and a number -* as the usage of the volume, in terms of number of bytes. -*/ -message VolumeInfoProto { - required string userName = 1; - required string volumeName = 2; - required uint64 volumeSize = 3; - required uint64 blockSize = 4; - optional uint64 usage = 5; - // TODO : potentially volume ACL -} - -/** -* This message is sent from CBlock server as response of info volume request. -*/ -message InfoVolumeResponseProto { - optional VolumeInfoProto volumeInfo = 1; -} - -/** -* This message is sent to CBlock server to list all available volume. -*/ -message ListVolumeRequestProto { - optional string userName = 1; -} - -/** -* This message is sent from CBlock server as response of volume listing. -*/ -message ListVolumeResponseProto { - repeated VolumeInfoProto volumeEntry = 1; -} - -service CBlockServiceProtocolService { - /** - * Create a volume. - */ - rpc createVolume(CreateVolumeRequestProto) returns(CreateVolumeResponseProto); - - /** - * Delete a volume. - */ - rpc deleteVolume(DeleteVolumeRequestProto) returns(DeleteVolumeResponseProto); - - /** - * Get info of a volume. - */ - rpc infoVolume(InfoVolumeRequestProto) returns(InfoVolumeResponseProto); - - /** - * List all available volumes. - */ - rpc listVolume(ListVolumeRequestProto) returns(ListVolumeResponseProto); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/resources/cblock-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/resources/cblock-default.xml b/hadoop-cblock/server/src/main/resources/cblock-default.xml deleted file mode 100644 index ebf36cd..0000000 --- a/hadoop-cblock/server/src/main/resources/cblock-default.xml +++ /dev/null @@ -1,347 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<!-- Do not modify this file directly. Instead, copy entries that you --> -<!-- wish to modify from this file into ozone-site.xml and change them --> -<!-- there. If ozone-site.xml does not already exist, create it. --> - -<!--Tags supported are OZONE, CBLOCK, MANAGEMENT, SECURITY, PERFORMANCE, --> -<!--DEBUG, CLIENT, SERVER, KSM, SCM, CRITICAL, RATIS, CONTAINER, REQUIRED, --> -<!--REST, STORAGE, PIPELINE, STANDALONE --> - -<configuration> - - <!--CBlock Settings--> - <property> - <name>dfs.cblock.block.buffer.flush.interval</name> - <value>60s</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Controls the frequency at this the local cache flushes the - blocks to the remote containers. - </description> - </property> - <property> - <name>dfs.cblock.cache.block.buffer.size</name> - <value>512</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Size of the local cache for blocks. So cache size will be block - size multiplied by this number. - </description> - </property> - <property> - <name>dfs.cblock.cache.core.min.pool.size</name> - <value>16</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - A minimum number of threads in the pool that cBlock cache will - use for the background I/O to remote containers. - </description> - </property> - <property> - <name>dfs.cblock.cache.max.pool.size</name> - <value>256</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Maximum number of threads in the pool that cBlock cache will - use for background I/O to remote containers. - </description> - </property> - <property> - <name>dfs.cblock.cache.keep.alive</name> - <value>60s</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - If the cblock cache has no I/O, then the threads in the cache - pool are kept idle for this amount of time before shutting down. - </description> - </property> - <property> - <name>dfs.cblock.cache.leveldb.cache.size.mb</name> - <value>256</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - The amount of physical memory allocated to the local cache. The - SCSI driver will allocate this much RAM cache instances. - </description> - </property> - <property> - <name>dfs.cblock.cache.max.retry</name> - <value>65536</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - If the local cache is enabled then, CBlock writes to the local - cache when I/O happens. Then the background I/O threads write this - block to the remote containers. This value controls how many times the - background thread should attempt to do I/O to the remote containers - before giving up. - </description> - </property> - <property> - <name>dfs.cblock.cache.queue.size.in.kb</name> - <value>256</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Size of the in memory cache queue, that is flushed to local - disk. - </description> - </property> - <property> - <name>dfs.cblock.cache.thread.priority</name> - <value>5</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Priority of cache flusher thread, affecting the relative performance of - write and read. Supported values are 1, 5, 10. - Use 10 for high priority and 1 for low priority. - </description> - </property> - <property> - <name>dfs.cblock.container.size.gb</name> - <value>5</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The size of ozone container in the number of GBs. Note that - this is not setting container size for ozone. This setting is - instructing CBlock to manage containers at a standard size. - </description> - </property> - <property> - <name>dfs.cblock.disk.cache.path</name> - <value>${hadoop.tmp.dir}/cblockCacheDB</value> - <tag>CBLOCK, REQUIRED</tag> - <description> - The default path for the cblock local cache. If the cblock - local cache is enabled, then it must be set to a valid path. This cache - *should* be mapped to the fastest disk on a given machine, For example, - an SSD drive would be a good idea. Currently, all mounted disk on a - data node is mapped to a single path, so having a large number of IOPS - is essential. - </description> - </property> - <property> - <name>dfs.cblock.jscsi-address</name> - <value/> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The address that cblock will be bind to, should be a host:port - format, This setting is required for cblock server to start. - This address to be used by jscsi to mount volume. - </description> - </property> - <property> - <name>dfs.cblock.jscsi.cblock.server.address</name> - <value>127.0.0.1</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The address local jscsi server will use to talk to cblock manager. - </description> - </property> - <property> - <name>dfs.cblock.jscsi.port</name> - <value>9811</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The port on CBlockManager node for jSCSI to talk to. - </description> - </property> - <property> - <name>dfs.cblock.jscsi.rpc-bind-host</name> - <value>0.0.0.0</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The actual address the cblock jscsi rpc server will bind to. If - this optional address is set, it overrides only the hostname portion of - dfs.cblock.jscsi-address. - </description> - </property> - <property> - <name>dfs.cblock.jscsi.server.address</name> - <value>0.0.0.0</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The address that jscsi server will be running, it is nice have one - local jscsi server for each client(Linux JSCSI client) that tries to - mount cblock. - </description> - </property> - <property> - <name>dfs.cblock.manager.pool.size</name> - <value>16</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Number of active threads that cblock manager will use for container - operations. The maximum number of the threads are limited to the - processor count * 2. - </description> - </property> - <property> - <name>dfs.cblock.rpc.timeout</name> - <value>300s</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - RPC timeout used for cblock CLI operations. When you - create very large disks, like 5TB, etc. The number of containers - allocated in the system is huge. It is will 5TB/5GB, which is 1000 - containers. The client CLI might timeout even though the cblock manager - creates the specified disk. This value allows the user to wait for a - longer period. - </description> - </property> - <property> - <name>dfs.cblock.scm.ipaddress</name> - <value>127.0.0.1</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - IP address used by cblock to connect to SCM. - </description> - </property> - <property> - <name>dfs.cblock.scm.port</name> - <value>9860</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - Port used by cblock to connect to SCM. - </description> - </property> - <property> - <name>dfs.cblock.service.handler.count</name> - <value>10</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - Default number of handlers for CBlock service rpc. - </description> - </property> - <property> - <name>dfs.cblock.service.leveldb.path</name> - <value>${hadoop.tmp.dir}/cblock_server.dat</value> - <tag>CBLOCK, REQUIRED</tag> - <description> - Default path for the cblock meta data store. - </description> - </property> - <property> - <name>dfs.cblock.service.rpc-bind-host</name> - <value>0.0.0.0</value> - <tag>CBLOCK, MANAGEMENT</tag> - <description> - The actual address the cblock service RPC server will bind to. - If the optional address is set, it overrides only the hostname portion of - dfs.cblock.servicerpc-address. - </description> - </property> - <property> - <name>dfs.cblock.servicerpc-address</name> - <value/> - <tag>CBLOCK, MANAGEMENT, REQUIRED</tag> - <description> - The address that cblock will be bind to, should be a host:port - format, this setting is required for cblock server to start. - This address is used for cblock management operations like create, delete, - info and list volumes - </description> - </property> - <property> - <name>dfs.cblock.short.circuit.io</name> - <value>false</value> - <tag>CBLOCK, PERFORMANCE</tag> - <description> - Enables use of the local cache in cblock. Enabling this allows - I/O against the local cache and background threads do actual I/O against - the - containers. - </description> - </property> - <property> - <name>dfs.cblock.trace.io</name> - <value>false</value> - <tag>CBLOCK, DEBUG</tag> - <description>Default flag for enabling trace io, Trace I/O logs all I/O with - hashes of - data. This is useful for detecting things like data corruption. - </description> - </property> - - <property> - <name>dfs.cblock.iscsi.advertised.ip</name> - <value>0.0.0.0</value> - <tag>CBLOCK</tag> - <description> - IP address returned during the iscsi discovery. - </description> - </property> - - <property> - <name>dfs.cblock.iscsi.advertised.port</name> - <value>3260</value> - <tag>CBLOCK</tag> - <description> - TCP port returned during the iscsi discovery. - </description> - </property> - - <property> - <name>dfs.cblock.kubernetes.dynamic-provisioner.enabled</name> - <value>false</value> - <tag>CBLOCK, KUBERNETES</tag> - <description>Flag to enable automatic creation of cblocks and - kubernetes PersitentVolumes in kubernetes environment. - </description> - </property> - - <property> - <name>dfs.cblock.kubernetes.cblock-user</name> - <value>iqn.2001-04.org.apache.hadoop</value> - <tag>CBLOCK, KUBERNETES</tag> - <description>CBlock user to use for the dynamic provisioner. - This user will own all of the auto-created cblocks. - </description> - </property> - - <property> - <name>dfs.cblock.kubernetes.configfile</name> - <value></value> - <tag>CBLOCK, KUBERNETES</tag> - <description>Location of the kubernetes configuration file - to access the kubernetes cluster. Not required inside a pod - as the default service account will be if this value is - empty. - </description> - </property> - - <property> - <name>dfs.cblock.iscsi.advertised.ip</name> - <value></value> - <tag>CBLOCK, KUBERNETES</tag> - <description>IP where the cblock target server is available - from the kubernetes nodes. Usually it's a cluster ip address - which is defined by a deployed Service. - </description> - </property> - - <property> - <name>dfs.cblock.iscsi.advertised.port</name> - <value>3260</value> - <tag>CBLOCK, KUBERNETES</tag> - <description>Port where the cblock target server is available - from the kubernetes nodes. Could be different from the - listening port if jscsi is behind a Service. - </description> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java deleted file mode 100644 index 50c4ba8..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.primitives.Longs; -import static java.util.concurrent.TimeUnit.SECONDS; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.TimeoutException; - - -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; - -/** - * Tests for Local Cache Buffer Manager. - */ -public class TestBufferManager { - private final static long GB = 1024 * 1024 * 1024; - private final static int KB = 1024; - private static MiniOzoneCluster cluster; - private static OzoneConfiguration config; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - String path = GenericTestUtils.getTempPath( - TestBufferManager.class.getSimpleName()); - config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - config.setBoolean(DFS_CBLOCK_TRACE_IO, true); - config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - cluster = new MiniOzoneClassicCluster.Builder(config) - .numDataNodes(1).setHandlerType("distributed").build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(config); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); - } - - /** - * createContainerAndGetPipeline creates a set of containers and returns the - * Pipelines that define those containers. - * - * @param count - Number of containers to create. - * @return - List of Pipelines. - * @throws IOException - */ - private List<Pipeline> createContainerAndGetPipeline(int count) - throws IOException { - List<Pipeline> containerPipelines = new LinkedList<>(); - for (int x = 0; x < count; x++) { - String traceID = "trace" + RandomStringUtils.randomNumeric(4); - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName, "CBLOCK"); - XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); - ContainerProtocolCalls.createContainer(client, traceID); - // This step is needed since we set private data on pipelines, when we - // read the list from CBlockServer. So we mimic that action here. - pipeline.setData(Longs.toByteArray(x)); - containerPipelines.add(pipeline); - xceiverClientManager.releaseClient(client); - } - return containerPipelines; - } - - /** - * This test writes some block to the cache and then shuts down the cache. - * The cache is then restarted to check that the - * correct number of blocks are read from Dirty Log - * - * @throws IOException - */ - @Test - public void testEmptyBlockBufferHandling() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - List<Pipeline> pipelines = createContainerAndGetPipeline(10); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - // Write data to the cache - cache.put(1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(1, metrics.getNumWriteOps()); - cache.put(2, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(2, metrics.getNumWriteOps()); - - // Store the previous block buffer position - Assert.assertEquals(2, metrics.getNumBlockBufferUpdates()); - // Simulate a shutdown by closing the cache - cache.close(); - Thread.sleep(1000); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); - Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits()); - - // Restart cache and check that right number of entries are read - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Thread fllushListenerThread = new Thread(newFlusher); - fllushListenerThread.setDaemon(true); - fllushListenerThread.start(); - - Thread.sleep(5000); - Assert.assertEquals(metrics.getNumBlockBufferUpdates(), - newMetrics.getNumDirtyLogBlockRead()); - Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() - * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); - // Now shutdown again, nothing should be flushed - newFlusher.shutdown(); - Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates()); - Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); - } - - @Test - public void testPeriodicFlush() throws IOException, - InterruptedException, TimeoutException{ - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig - .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 5, SECONDS); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread.sleep(8000); - // Ticks will be at 5s, 10s and so on, so this count should be 1 - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - // Nothing pushed to cache, so nothing should be written - Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); - cache.close(); - // After close, another trigger should happen but still no data written - Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); - } - - @Test - public void testSingleBufferFlush() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - - for (int i = 0; i < 511; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - // After writing 511 block no flush should happen - Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); - - - // After one more block it should - cache.put(512, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - Thread.sleep(1000); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - cache.close(); - Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - } - - @Test - public void testMultipleBuffersFlush() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig - .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 120, SECONDS); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - - for (int i = 0; i < 4; i++) { - for (int j = 0; j < 512; j++) { - cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8)); - } - // Flush should be triggered after every 512 block write - Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered()); - } - Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles()); - Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes()); - cache.close(); - Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted()); - } - - @Test - public void testSingleBlockFlush() throws IOException, - InterruptedException, TimeoutException{ - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig - .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, - 5, SECONDS); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - cache.put(0, data.getBytes(StandardCharsets.UTF_8)); - Thread.sleep(8000); - // Ticks will be at 5s, 10s and so on, so this count should be 1 - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - // 1 block written to cache, which should be flushed - Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - cache.close(); - // After close, another trigger should happen but no data should be written - Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); - } - - @Test - public void testRepeatedBlockWrites() throws IOException, - InterruptedException, TimeoutException{ - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - Thread fllushListenerThread = new Thread(flusher); - fllushListenerThread.setDaemon(true); - fllushListenerThread.start(); - cache.start(); - for (int i = 0; i < 512; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(512, metrics.getNumWriteOps()); - Assert.assertEquals(512, metrics.getNumBlockBufferUpdates()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - Thread.sleep(5000); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - - - for (int i = 0; i < 512; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(1024, metrics.getNumWriteOps()); - Assert.assertEquals(1024, metrics.getNumBlockBufferUpdates()); - Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); - - Thread.sleep(5000); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(2, metrics.getNumBlockBufferFlushCompleted()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java deleted file mode 100644 index 4139ac6..0000000 --- a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import org.apache.hadoop.conf.TestConfigurationFieldsBase; - -/** - * Tests if configuration constants documented in ozone-defaults.xml. - */ -public class TestCBlockConfigurationFields extends TestConfigurationFieldsBase { - - @Override - public void initializeMemberVariables() { - xmlFilename = new String("cblock-default.xml"); - configurationClasses = - new Class[] {CBlockConfigKeys.class}; - errorIfMissingConfigProps = true; - errorIfMissingXmlProps = true; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org