http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java deleted file mode 100644 index 4744968..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java +++ /dev/null @@ -1,440 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.jscsi.target.storage.IStorageModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_TRACE_IO_DEFAULT; - -/** - * The SCSI Target class for CBlockSCSIServer. - */ -final public class CBlockIStorageImpl implements IStorageModule { - private static final Logger LOGGER = - LoggerFactory.getLogger(CBlockIStorageImpl.class); - private static final Logger TRACER = - LoggerFactory.getLogger("TraceIO"); - - private CacheModule cache; - private final long volumeSize; - private final int blockSize; - private final String userName; - private final String volumeName; - private final boolean traceEnabled; - private final Configuration conf; - private final ContainerCacheFlusher flusher; - private List<Pipeline> fullContainerList; - - /** - * private: constructs a SCSI Target. - * - * @param config - config - * @param userName - Username - * @param volumeName - Name of the volume - * @param volumeSize - Size of the volume - * @param blockSize - Size of the block - * @param fullContainerList - Ordered list of containers that make up this - * volume. - * @param flusher - flusher which is used to flush data from - * level db cache to containers - * @throws IOException - Throws IOException. - */ - private CBlockIStorageImpl(Configuration config, String userName, - String volumeName, long volumeSize, int blockSize, - List<Pipeline> fullContainerList, ContainerCacheFlusher flusher) { - this.conf = config; - this.userName = userName; - this.volumeName = volumeName; - this.volumeSize = volumeSize; - this.blockSize = blockSize; - this.fullContainerList = new ArrayList<>(fullContainerList); - this.flusher = flusher; - this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO, - DFS_CBLOCK_TRACE_IO_DEFAULT); - } - - /** - * private: initialize the cache. - * - * @param xceiverClientManager - client manager that is used for creating new - * connections to containers. - * @param metrics - target metrics to maintain metrics for target server - * @throws IOException - Throws IOException. - */ - private void initCache(XceiverClientManager xceiverClientManager, - CBlockTargetMetrics metrics) throws IOException { - this.cache = CBlockLocalCache.newBuilder() - .setConfiguration(conf) - .setVolumeName(this.volumeName) - .setUserName(this.userName) - .setPipelines(this.fullContainerList) - .setClientManager(xceiverClientManager) - .setBlockSize(blockSize) - .setVolumeSize(volumeSize) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - this.cache.start(); - } - - /** - * Gets a new builder for CBlockStorageImpl. - * - * @return builder - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Get Cache. - * - * @return - Cache - */ - public CacheModule getCache() { - return cache; - } - - /** - * Returns block size of this volume. - * - * @return int size of block for this volume. - */ - @Override - public int getBlockSize() { - return blockSize; - } - - /** - * Checks the index boundary of a block address. - * - * @param logicalBlockAddress the index of the first block of data to be read - * or written - * @param transferLengthInBlocks the total number of consecutive blocks about - * to be read or written - * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2 - * indicates that LBA + transfer size is out of bounds. - */ - @Override - public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) { - long sizeInBlocks = volumeSize / blockSize; - int res = 0; - if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) { - res = 1; - } - - if (transferLengthInBlocks < 0 || - logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) { - if (res == 0) { - res = 2; - } - } - return res; - } - - /** - * Number of blocks that make up this volume. - * - * @return long - count of blocks. - */ - @Override - public long getSizeInBlocks() { - return volumeSize / blockSize; - } - - /** - * Reads the number of bytes that can be read into the bytes buffer from the - * location indicated. - * - * @param bytes the array into which the data will be copied will be filled - * with data from storage - * @param storageIndex the position of the first byte to be copied - * @throws IOException - */ - @Override - public void read(byte[] bytes, long storageIndex) throws IOException { - int startingIdxInBlock = (int) storageIndex % blockSize; - int idxInBytes = 0; - if (this.traceEnabled) { - TRACER.info("Task=ReadStart,length={},location={}", - bytes.length, storageIndex); - } - while (idxInBytes < bytes.length - 1) { - long blockId = (storageIndex + idxInBytes) / blockSize; - byte[] dataBytes; - - try { - LogicalBlock block = this.cache.get(blockId); - dataBytes = block.getData().array(); - - if (this.traceEnabled) { - TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}", - blockId, - dataBytes.length, - dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null); - } - } catch (IOException e) { - // For an non-existing block cache.get will return a block with zero - // bytes filled. So any error here is a real error. - LOGGER.error("getting errors when reading data:" + e); - throw e; - } - - int length = blockSize - startingIdxInBlock; - if (length > bytes.length - idxInBytes) { - length = bytes.length - idxInBytes; - } - if (dataBytes.length >= length) { - System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes, - length); - } - startingIdxInBlock = 0; - idxInBytes += length; - } - if (this.traceEnabled) { - TRACER.info("Task=ReadEnd,length={},location={},SHA={}", - bytes.length, storageIndex, DigestUtils.sha256Hex(bytes)); - } - } - - @Override - public void write(byte[] bytes, long storageIndex) throws IOException { - int startingIdxInBlock = (int) storageIndex % blockSize; - int idxInBytes = 0; - if (this.traceEnabled) { - TRACER.info("Task=WriteStart,length={},location={},SHA={}", - bytes.length, storageIndex, - bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null); - } - - ByteBuffer dataByte = ByteBuffer.allocate(blockSize); - while (idxInBytes < bytes.length - 1) { - long blockId = (storageIndex + idxInBytes) / blockSize; - int length = blockSize - startingIdxInBlock; - if (length > bytes.length - idxInBytes) { - length = bytes.length - idxInBytes; - } - System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock, - length); - this.cache.put(blockId, dataByte.array()); - - if (this.traceEnabled) { - TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}", - blockId, dataByte.array().length, - dataByte.array().length > 0 ? - DigestUtils.sha256Hex(dataByte.array()) : null); - } - dataByte.clear(); - startingIdxInBlock = 0; - idxInBytes += length; - } - - if (this.traceEnabled) { - TRACER.info("Task=WriteEnd,length={},location={} ", - bytes.length, storageIndex); - } - } - - @Override - public void close() throws IOException { - try { - cache.close(); - } catch (IllegalStateException ise) { - LOGGER.error("Can not close the storage {}", ise); - throw ise; - } - } - - /** - * Builder class for CBlocklocalCache. - */ - public static class Builder { - private String userName; - private String volumeName; - private long volumeSize; - private int blockSize; - private List<Pipeline> containerList; - private Configuration conf; - private XceiverClientManager clientManager; - private ContainerCacheFlusher flusher; - private CBlockTargetMetrics metrics; - - /** - * Constructs a builder. - */ - Builder() { - - } - - public Builder setFlusher(ContainerCacheFlusher cacheFlusher) { - this.flusher = cacheFlusher; - return this; - } - - /** - * set config. - * - * @param config - config - * @return Builder - */ - public Builder setConf(Configuration config) { - this.conf = config; - return this; - } - - /** - * set user name. - * - * @param cblockUserName - user name - * @return Builder - */ - public Builder setUserName(String cblockUserName) { - this.userName = cblockUserName; - return this; - } - - /** - * set volume name. - * - * @param cblockVolumeName -- volume name - * @return Builder - */ - public Builder setVolumeName(String cblockVolumeName) { - this.volumeName = cblockVolumeName; - return this; - } - - /** - * set volume size. - * - * @param cblockVolumeSize -- set volume size. - * @return Builder - */ - public Builder setVolumeSize(long cblockVolumeSize) { - this.volumeSize = cblockVolumeSize; - return this; - } - - /** - * set block size. - * - * @param cblockBlockSize -- block size - * @return Builder - */ - public Builder setBlockSize(int cblockBlockSize) { - this.blockSize = cblockBlockSize; - return this; - } - - /** - * Set contianer list. - * - * @param cblockContainerList - set the pipeline list - * @return Builder - */ - public Builder setContainerList(List<Pipeline> cblockContainerList) { - this.containerList = cblockContainerList; - return this; - } - - /** - * Set client manager. - * - * @param xceiverClientManager -- sets the client manager. - * @return Builder - */ - public Builder setClientManager(XceiverClientManager xceiverClientManager) { - this.clientManager = xceiverClientManager; - return this; - } - - /** - * Set Cblock Target Metrics. - * - * @param targetMetrics -- sets the cblock target metrics - * @return Builder - */ - public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { - this.metrics = targetMetrics; - return this; - } - - /** - * Builds the CBlockStorageImpl. - * - * @return builds the CBlock Scsi Target. - */ - public CBlockIStorageImpl build() throws IOException { - if (StringUtils.isBlank(userName)) { - throw new IllegalArgumentException("User name cannot be null or empty" + - "."); - } - if (StringUtils.isBlank(volumeName)) { - throw new IllegalArgumentException("Volume name cannot be null or " + - "empty"); - } - - if (volumeSize < 1) { - throw new IllegalArgumentException("Volume size cannot be negative or" + - " zero."); - } - - if (blockSize < 1) { - throw new IllegalArgumentException("Block size cannot be negative or " + - "zero."); - } - - if (containerList == null || containerList.size() == 0) { - throw new IllegalArgumentException("Container list cannot be null or " + - "empty"); - } - if (clientManager == null) { - throw new IllegalArgumentException("Client manager cannot be null"); - } - if (conf == null) { - throw new IllegalArgumentException("Configuration cannot be null"); - } - - if (flusher == null) { - throw new IllegalArgumentException("Flusher Cannot be null."); - } - CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, this.userName, - this.volumeName, this.volumeSize, this.blockSize, this.containerList, - this.flusher); - impl.initCache(this.clientManager, this.metrics); - return impl; - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java deleted file mode 100644 index 6367c61..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; - -import java.io.IOException; -import java.util.List; - -/** - * This class is the handler of CBlockManager used by target server - * to communicate with CBlockManager. - * - * More specifically, this class will expose local methods to target - * server, and make RPC calls to CBlockManager accordingly - */ -public class CBlockManagerHandler { - - private final CBlockClientProtocolClientSideTranslatorPB handler; - - public CBlockManagerHandler( - CBlockClientProtocolClientSideTranslatorPB handler) { - this.handler = handler; - } - - public MountVolumeResponse mountVolume( - String userName, String volumeName) throws IOException { - return handler.mountVolume(userName, volumeName); - } - - public List<VolumeInfo> listVolumes() throws IOException { - return handler.listVolumes(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java deleted file mode 100644 index e7df0cf..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -/** - * This class is for maintaining the various Cblock Target statistics - * and publishing them through the metrics interfaces. - * This also registers the JMX MBean for RPC. - * - * This class maintains stats like cache hit and miss ratio - * as well as the latency time of read and write ops. - */ -public class CBlockTargetMetrics { - // IOPS based Metrics - @Metric private MutableCounterLong numReadOps; - @Metric private MutableCounterLong numWriteOps; - @Metric private MutableCounterLong numReadCacheHits; - @Metric private MutableCounterLong numReadCacheMiss; - @Metric private MutableCounterLong numDirectBlockWrites; - - // Cblock internal Metrics - @Metric private MutableCounterLong numDirtyLogBlockRead; - @Metric private MutableCounterLong numBytesDirtyLogRead; - @Metric private MutableCounterLong numBytesDirtyLogWritten; - @Metric private MutableCounterLong numBlockBufferFlushCompleted; - @Metric private MutableCounterLong numBlockBufferFlushTriggered; - @Metric private MutableCounterLong numBlockBufferUpdates; - @Metric private MutableCounterLong numRetryLogBlockRead; - @Metric private MutableCounterLong numBytesRetryLogRead; - - // Failure Metrics - @Metric private MutableCounterLong numReadLostBlocks; - @Metric private MutableCounterLong numFailedReadBlocks; - @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks; - @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks; - @Metric private MutableCounterLong numFailedDirectBlockWrites; - @Metric private MutableCounterLong numIllegalDirtyLogFiles; - @Metric private MutableCounterLong numFailedDirtyLogFileDeletes; - @Metric private MutableCounterLong numFailedBlockBufferFlushes; - @Metric private MutableCounterLong numInterruptedBufferWaits; - @Metric private MutableCounterLong numFailedRetryLogFileWrites; - @Metric private MutableCounterLong numWriteMaxRetryBlocks; - @Metric private MutableCounterLong numFailedReleaseLevelDB; - - // Latency based Metrics - @Metric private MutableRate dbReadLatency; - @Metric private MutableRate containerReadLatency; - @Metric private MutableRate dbWriteLatency; - @Metric private MutableRate containerWriteLatency; - @Metric private MutableRate blockBufferFlushLatency; - @Metric private MutableRate directBlockWriteLatency; - - public CBlockTargetMetrics() { - } - - public static CBlockTargetMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register("CBlockTargetMetrics", - "CBlock Target Metrics", - new CBlockTargetMetrics()); - } - - public void incNumReadOps() { - numReadOps.incr(); - } - - public void incNumWriteOps() { - numWriteOps.incr(); - } - - public void incNumReadCacheHits() { - numReadCacheHits.incr(); - } - - public void incNumReadCacheMiss() { - numReadCacheMiss.incr(); - } - - public void incNumReadLostBlocks() { - numReadLostBlocks.incr(); - } - - public void incNumDirectBlockWrites() { - numDirectBlockWrites.incr(); - } - - public void incNumWriteIOExceptionRetryBlocks() { - numWriteIOExceptionRetryBlocks.incr(); - } - - public void incNumWriteGenericExceptionRetryBlocks() { - numWriteGenericExceptionRetryBlocks.incr(); - } - - public void incNumFailedDirectBlockWrites() { - numFailedDirectBlockWrites.incr(); - } - - public void incNumFailedReadBlocks() { - numFailedReadBlocks.incr(); - } - - public void incNumBlockBufferFlushCompleted() { - numBlockBufferFlushCompleted.incr(); - } - - public void incNumBlockBufferFlushTriggered() { - numBlockBufferFlushTriggered.incr(); - } - - public void incNumDirtyLogBlockRead() { - numDirtyLogBlockRead.incr(); - } - - public void incNumBytesDirtyLogRead(int bytes) { - numBytesDirtyLogRead.incr(bytes); - } - - public void incNumBlockBufferUpdates() { - numBlockBufferUpdates.incr(); - } - - public void incNumRetryLogBlockRead() { - numRetryLogBlockRead.incr(); - } - - public void incNumBytesRetryLogRead(int bytes) { - numBytesRetryLogRead.incr(bytes); - } - - public void incNumBytesDirtyLogWritten(int bytes) { - numBytesDirtyLogWritten.incr(bytes); - } - - public void incNumFailedBlockBufferFlushes() { - numFailedBlockBufferFlushes.incr(); - } - - public void incNumInterruptedBufferWaits() { - numInterruptedBufferWaits.incr(); - } - - public void incNumIllegalDirtyLogFiles() { - numIllegalDirtyLogFiles.incr(); - } - - public void incNumFailedDirtyLogFileDeletes() { - numFailedDirtyLogFileDeletes.incr(); - } - - public void incNumFailedRetryLogFileWrites() { - numFailedRetryLogFileWrites.incr(); - } - - public void incNumWriteMaxRetryBlocks() { - numWriteMaxRetryBlocks.incr(); - } - - public void incNumFailedReleaseLevelDB() { - numFailedReleaseLevelDB.incr(); - } - - public void updateDBReadLatency(long latency) { - dbReadLatency.add(latency); - } - - public void updateContainerReadLatency(long latency) { - containerReadLatency.add(latency); - } - - public void updateDBWriteLatency(long latency) { - dbWriteLatency.add(latency); - } - - public void updateContainerWriteLatency(long latency) { - containerWriteLatency.add(latency); - } - - public void updateDirectBlockWriteLatency(long latency) { - directBlockWriteLatency.add(latency); - } - - public void updateBlockBufferFlushLatency(long latency) { - blockBufferFlushLatency.add(latency); - } - - @VisibleForTesting - public long getNumReadOps() { - return numReadOps.value(); - } - - @VisibleForTesting - public long getNumWriteOps() { - return numWriteOps.value(); - } - - @VisibleForTesting - public long getNumReadCacheHits() { - return numReadCacheHits.value(); - } - - @VisibleForTesting - public long getNumReadCacheMiss() { - return numReadCacheMiss.value(); - } - - @VisibleForTesting - public long getNumReadLostBlocks() { - return numReadLostBlocks.value(); - } - - @VisibleForTesting - public long getNumDirectBlockWrites() { - return numDirectBlockWrites.value(); - } - - @VisibleForTesting - public long getNumFailedDirectBlockWrites() { - return numFailedDirectBlockWrites.value(); - } - - @VisibleForTesting - public long getNumFailedReadBlocks() { - return numFailedReadBlocks.value(); - } - - @VisibleForTesting - public long getNumWriteIOExceptionRetryBlocks() { - return numWriteIOExceptionRetryBlocks.value(); - } - - @VisibleForTesting - public long getNumWriteGenericExceptionRetryBlocks() { - return numWriteGenericExceptionRetryBlocks.value(); - } - - @VisibleForTesting - public long getNumBlockBufferFlushCompleted() { - return numBlockBufferFlushCompleted.value(); - } - - @VisibleForTesting - public long getNumBlockBufferFlushTriggered() { - return numBlockBufferFlushTriggered.value(); - } - - @VisibleForTesting - public long getNumDirtyLogBlockRead() { - return numDirtyLogBlockRead.value(); - } - - @VisibleForTesting - public long getNumBytesDirtyLogReads() { - return numBytesDirtyLogRead.value(); - } - - @VisibleForTesting - public long getNumBlockBufferUpdates() { - return numBlockBufferUpdates.value(); - } - - @VisibleForTesting - public long getNumRetryLogBlockRead() { - return numRetryLogBlockRead.value(); - } - - @VisibleForTesting - public long getNumBytesRetryLogReads() { - return numBytesRetryLogRead.value(); - } - - @VisibleForTesting - public long getNumBytesDirtyLogWritten() { - return numBytesDirtyLogWritten.value(); - } - - @VisibleForTesting - public long getNumFailedBlockBufferFlushes() { - return numFailedBlockBufferFlushes.value(); - } - - @VisibleForTesting - public long getNumInterruptedBufferWaits() { - return numInterruptedBufferWaits.value(); - } - - @VisibleForTesting - public long getNumIllegalDirtyLogFiles() { - return numIllegalDirtyLogFiles.value(); - } - - @VisibleForTesting - public long getNumFailedDirtyLogFileDeletes() { - return numFailedDirtyLogFileDeletes.value(); - } - - @VisibleForTesting - public long getNumFailedRetryLogFileWrites() { - return numFailedRetryLogFileWrites.value(); - } - - @VisibleForTesting - public long getNumWriteMaxRetryBlocks() { - return numWriteMaxRetryBlocks.value(); - } - - @VisibleForTesting - public long getNumFailedReleaseLevelDB() { - return numFailedReleaseLevelDB.value(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java deleted file mode 100644 index afbd260..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.util.KeyUtil; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.jscsi.target.Configuration; -import org.jscsi.target.Target; -import org.jscsi.target.TargetServer; - -import java.io.IOException; -import java.util.HashMap; - -/** - * This class extends JSCSI target server, which is a ISCSI target that can be - * recognized by a remote machine with ISCSI installed. - */ -public final class CBlockTargetServer extends TargetServer { - private final OzoneConfiguration conf; - private final CBlockManagerHandler cBlockManagerHandler; - private final XceiverClientManager xceiverClientManager; - private final ContainerCacheFlusher containerCacheFlusher; - private final CBlockTargetMetrics metrics; - - public CBlockTargetServer(OzoneConfiguration ozoneConfig, - Configuration jscsiConf, - CBlockManagerHandler cBlockManagerHandler, - CBlockTargetMetrics metrics) - throws IOException { - super(jscsiConf); - this.cBlockManagerHandler = cBlockManagerHandler; - this.xceiverClientManager = new XceiverClientManager(ozoneConfig); - this.conf = ozoneConfig; - this.containerCacheFlusher = new ContainerCacheFlusher(this.conf, - xceiverClientManager, metrics); - this.metrics = metrics; - LOGGER.info("Starting flusher thread."); - Thread flushListenerThread = new Thread(containerCacheFlusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - } - - public static void main(String[] args) throws Exception { - } - - @Override - public boolean isValidTargetName(String checkTargetName) { - if (!KeyUtil.isValidVolumeKey(checkTargetName)) { - return false; - } - String userName = KeyUtil.getUserNameFromVolumeKey(checkTargetName); - String volumeName = KeyUtil.getVolumeFromVolumeKey(checkTargetName); - if (userName == null || volumeName == null) { - return false; - } - try { - MountVolumeResponse result = - cBlockManagerHandler.mountVolume(userName, volumeName); - if (!result.getIsValid()) { - LOGGER.error("Not a valid volume:" + checkTargetName); - return false; - } - String volumeKey = KeyUtil.getVolumeKey(result.getUserName(), - result.getVolumeName()); - if (!targets.containsKey(volumeKey)) { - LOGGER.info("Mounting Volume. username: {} volume:{}", - userName, volumeName); - CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() - .setUserName(userName) - .setVolumeName(volumeName) - .setVolumeSize(result.getVolumeSize()) - .setBlockSize(result.getBlockSize()) - .setContainerList(result.getContainerList()) - .setClientManager(xceiverClientManager) - .setConf(this.conf) - .setFlusher(containerCacheFlusher) - .setCBlockTargetMetrics(metrics) - .build(); - Target target = new Target(volumeKey, volumeKey, ozoneStore); - targets.put(volumeKey, target); - } - } catch (IOException e) { - LOGGER.error("Can not connect to server when validating target!" - + e.getMessage()); - } - return targets.containsKey(checkTargetName); - } - - @Override - public String[] getTargetNames() { - try { - if (cBlockManagerHandler != null) { - return cBlockManagerHandler.listVolumes(). - stream().map( - volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo - .getVolumeName()).toArray(String[]::new); - } else { - return new String[0]; - } - } catch (IOException e) { - LOGGER.error("Can't list existing volumes", e); - return new String[0]; - } - } - - @VisibleForTesting - public HashMap<String, Target> getTargets() { - return targets; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java deleted file mode 100644 index 171f3e2..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java +++ /dev/null @@ -1,599 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.cblock.CBlockConfigKeys; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_KEEP_ALIVE; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_MAX_POOL_SIZE; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_THREAD_PRIORITY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT; - -/** - * Class that writes to remote containers. - */ -public class ContainerCacheFlusher implements Runnable { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerCacheFlusher.class); - private final LinkedBlockingQueue<Message> messageQueue; - private final ThreadPoolExecutor threadPoolExecutor; - private final ArrayBlockingQueue<Runnable> workQueue; - private final ConcurrentMap<String, RefCountedDB> dbMap; - private final ByteBuffer blockIDBuffer; - private final ConcurrentMap<String, Pipeline[]> pipelineMap; - private final AtomicLong remoteIO; - private final XceiverClientManager xceiverClientManager; - private final CBlockTargetMetrics metrics; - private AtomicBoolean shutdown; - private final long levelDBCacheSize; - private final int maxRetryCount; - private final String tracePrefix; - - private final ConcurrentMap<String, FinishCounter> finishCountMap; - - /** - * Constructs the writers to remote queue. - */ - public ContainerCacheFlusher(Configuration config, - XceiverClientManager xceiverClientManager, - CBlockTargetMetrics metrics) { - int queueSize = config.getInt(DFS_CBLOCK_CACHE_QUEUE_SIZE_KB, - DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT) * 1024; - int corePoolSize = config.getInt(DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE, - DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT); - int maxPoolSize = config.getInt(DFS_CBLOCK_CACHE_MAX_POOL_SIZE, - DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT); - long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE, - DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT, TimeUnit.SECONDS); - int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY, - DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); - int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); - levelDBCacheSize = config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY, - DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB; - - LOG.info("Cache: Core Pool Size: {}", corePoolSize); - LOG.info("Cache: Keep Alive: {}", keepAlive); - LOG.info("Cache: Max Pool Size: {}", maxPoolSize); - LOG.info("Cache: Thread Pri: {}", threadPri); - LOG.info("Cache: BlockBuffer Size: {}", blockBufferSize); - - shutdown = new AtomicBoolean(false); - messageQueue = new LinkedBlockingQueue<>(); - workQueue = new ArrayBlockingQueue<>(queueSize, true); - - ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("Cache Block Writer Thread #%d") - .setDaemon(true) - .setPriority(threadPri) - .build(); - threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, - keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory, - new ThreadPoolExecutor.AbortPolicy()); - threadPoolExecutor.prestartAllCoreThreads(); - - dbMap = new ConcurrentHashMap<>(); - pipelineMap = new ConcurrentHashMap<>(); - blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize); - this.xceiverClientManager = xceiverClientManager; - this.metrics = metrics; - this.remoteIO = new AtomicLong(); - - this.finishCountMap = new ConcurrentHashMap<>(); - this.maxRetryCount = - config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY, - CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT); - this.tracePrefix = getTracePrefix(); - } - - private void checkExistingLog(String prefixFileName, File dbPath) { - if (!dbPath.exists()) { - LOG.debug("No existing dirty log found at {}", dbPath); - return; - } - LOG.debug("Need to check and requeue existing dirty log {}", dbPath); - HashMap<String, ArrayList<String>> allFiles = new HashMap<>(); - traverse(prefixFileName, dbPath, allFiles); - for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) { - String parentPath = entry.getKey(); - for (String fileName : entry.getValue()) { - LOG.info("found {} {} with prefix {}", - parentPath, fileName, prefixFileName); - processDirtyBlocks(parentPath, fileName); - } - } - } - - private void traverse(String prefixFileName, File path, - HashMap<String, ArrayList<String>> files) { - if (path.isFile()) { - if (path.getName().startsWith(prefixFileName)) { - LOG.debug("found this {} with {}", path.getParent(), path.getName()); - if (!files.containsKey(path.getParent())) { - files.put(path.getParent(), new ArrayList<>()); - } - files.get(path.getParent()).add(path.getName()); - } - } else { - File[] listFiles = path.listFiles(); - if (listFiles != null) { - for (File subPath : listFiles) { - traverse(prefixFileName, subPath, files); - } - } - } - } - - /** - * Gets the CBlockTargetMetrics. - * - * @return CBlockTargetMetrics - */ - public CBlockTargetMetrics getTargetMetrics() { - return metrics; - } - - /** - * Gets the getXceiverClientManager. - * - * @return XceiverClientManager - */ - public XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; - } - - /** - * Shutdown this instance. - */ - public void shutdown() { - this.shutdown.set(true); - threadPoolExecutor.shutdown(); - } - - public long incrementRemoteIO() { - return remoteIO.incrementAndGet(); - } - - /** - * Processes a block cache file and queues those blocks for the remote I/O. - * - * @param dbPath - Location where the DB can be found. - * @param fileName - Block Cache File Name - */ - public void processDirtyBlocks(String dbPath, String fileName) { - LOG.info("Adding {}/{} to queue. Queue Length: {}", dbPath, fileName, - messageQueue.size()); - this.messageQueue.add(new Message(dbPath, fileName)); - } - - public Logger getLOG() { - return LOG; - } - - /** - * Opens a DB if needed or returns a handle to an already open DB. - * - * @param dbPath -- dbPath - * @return the levelDB on the given path. - * @throws IOException - */ - public synchronized LevelDBStore openDB(String dbPath) - throws IOException { - if (dbMap.containsKey(dbPath)) { - RefCountedDB refDB = dbMap.get(dbPath); - refDB.open(); - return refDB.db; - } else { - Options options = new Options(); - options.cacheSize(levelDBCacheSize); - options.createIfMissing(true); - LevelDBStore cacheDB = new LevelDBStore( - new File(getDBFileName(dbPath)), options); - RefCountedDB refDB = new RefCountedDB(dbPath, cacheDB); - dbMap.put(dbPath, refDB); - return cacheDB; - } - } - - /** - * Updates the container map. This data never changes so we will update this - * during restarts and it should not hurt us. - * - * Once a CBlockLocalCache cache is registered, requeue dirty/retry log files - * for the volume - * - * @param dbPath - DbPath - * @param containerList - Container List. - */ - public void register(String dbPath, Pipeline[] containerList) { - File dbFile = Paths.get(dbPath).toFile(); - pipelineMap.put(dbPath, containerList); - checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile); - checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile); - } - - private String getDBFileName(String dbPath) { - return dbPath + ".db"; - } - - public LevelDBStore getCacheDB(String dbPath) throws IOException { - return openDB(dbPath); - } - - public void releaseCacheDB(String dbPath) { - try { - closeDB(dbPath); - } catch (Exception e) { - metrics.incNumFailedReleaseLevelDB(); - LOG.error("LevelDB close failed, dbPath:" + dbPath, e); - } - } - /** - * Close the DB if we don't have any outstanding references. - * - * @param dbPath - dbPath - * @throws IOException - */ - public synchronized void closeDB(String dbPath) throws IOException { - if (dbMap.containsKey(dbPath)) { - RefCountedDB refDB = dbMap.get(dbPath); - int count = refDB.close(); - if (count == 0) { - dbMap.remove(dbPath); - } - } - } - - Pipeline getPipeline(String dbPath, long blockId) { - Pipeline[] containerList = pipelineMap.get(dbPath); - Preconditions.checkNotNull(containerList); - int containerIdx = (int) blockId % containerList.length; - long cBlockIndex = - Longs.fromByteArray(containerList[containerIdx].getData()); - if (cBlockIndex > 0) { - // This catches the case when we get a wrong container in the ordering - // of the containers. - Preconditions.checkState(containerIdx % cBlockIndex == 0, - "The container ID computed should match with the container index " + - "returned from cBlock Server."); - } - return containerList[containerIdx]; - } - - public void incFinishCount(String fileName) { - if (!finishCountMap.containsKey(fileName)) { - LOG.error("No record for such file:" + fileName); - return; - } - finishCountMap.get(fileName).incCount(); - if (finishCountMap.get(fileName).isFileDeleted()) { - finishCountMap.remove(fileName); - } - } - - /** - * When an object implementing interface <code>Runnable</code> is used - * to create a thread, starting the thread causes the object's - * <code>run</code> method to be called in that separately executing - * thread. - * <p> - * The general contract of the method <code>run</code> is that it may - * take any action whatsoever. - * - * @see Thread#run() - */ - @Override - public void run() { - while (!this.shutdown.get()) { - try { - Message message = messageQueue.take(); - LOG.debug("Got message to process -- DB Path : {} , FileName; {}", - message.getDbPath(), message.getFileName()); - String fullPath = Paths.get(message.getDbPath(), - message.getFileName()).toString(); - String[] fileNameParts = message.getFileName().split("\\."); - Preconditions.checkState(fileNameParts.length > 1); - String fileType = fileNameParts[0]; - boolean isDirtyLogFile = - fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX); - ReadableByteChannel fileChannel = new FileInputStream(fullPath) - .getChannel(); - // TODO: We can batch and unique the IOs here. First getting the code - // to work, we will add those later. - int bytesRead = fileChannel.read(blockIDBuffer); - fileChannel.close(); - LOG.debug("Read blockID log of size: {} position {} remaining {}", - bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining()); - // current position of in the buffer in bytes, divided by number of - // bytes per long (which is calculated by number of bits per long - // divided by number of bits per byte) gives the number of blocks - int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE); - if (isDirtyLogFile) { - getTargetMetrics().incNumBytesDirtyLogRead(bytesRead); - } else { - getTargetMetrics().incNumBytesRetryLogRead(bytesRead); - } - if (finishCountMap.containsKey(message.getFileName())) { - // In theory this should never happen. But if it happened, - // we need to know it... - getTargetMetrics().incNumIllegalDirtyLogFiles(); - LOG.error("Adding DirtyLog file again {} current count {} new {}", - message.getFileName(), - finishCountMap.get(message.getFileName()).expectedCount, - blockCount); - } - finishCountMap.put(message.getFileName(), - new FinishCounter(blockCount, message.getDbPath(), - message.getFileName(), this)); - // should be flip instead of rewind, because we also need to make sure - // the end position is correct. - blockIDBuffer.flip(); - LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(), - blockCount); - while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) { - long blockID = blockIDBuffer.getLong(); - int retryCount = 0; - if (isDirtyLogFile) { - getTargetMetrics().incNumDirtyLogBlockRead(); - } else { - getTargetMetrics().incNumRetryLogBlockRead(); - Preconditions.checkState(fileNameParts.length == 4); - retryCount = Integer.parseInt(fileNameParts[3]); - } - LogicalBlock block = new DiskBlock(blockID, null, false); - BlockWriterTask blockWriterTask = new BlockWriterTask(block, this, - message.getDbPath(), retryCount, message.getFileName(), - maxRetryCount); - threadPoolExecutor.submit(blockWriterTask); - } - blockIDBuffer.clear(); - } catch (InterruptedException e) { - LOG.info("ContainerCacheFlusher is interrupted.", e); - } catch (FileNotFoundException e) { - LOG.error("Unable to find the dirty blocks file. This will cause " + - "data errors. Please stop using this volume.", e); - } catch (IOException e) { - LOG.error("Unable to read the dirty blocks file. This will cause " + - "data errors. Please stop using this volume.", e); - } catch (Exception e) { - LOG.error("Generic exception.", e); - } - } - LOG.info("Exiting flusher"); - } - - /** - * Tries to get the local host IP Address as trace prefix - * for creating trace IDs, otherwise uses a random UUID for it. - */ - private static String getTracePrefix() { - String tmp; - try { - tmp = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException ex) { - tmp = UUID.randomUUID().toString(); - LOG.error("Unable to read the host address. Using a GUID for " + - "hostname:{} ", tmp, ex); - } - return tmp; - } - - /** - * We create a trace ID to make it easy to debug issues. - * A trace ID is in IPAddress:UserName:VolumeName:blockID:second format. - * - * This will get written down on the data node if we get any failures, so - * with this trace ID we can correlate cBlock failures across machines. - * - * @param blockID - Block ID - * @return trace ID - */ - public String getTraceID(File dbPath, long blockID) { - String volumeName = dbPath.getName(); - String userName = dbPath.getParentFile().getName(); - // mapping to seconds to make the string smaller. - return tracePrefix + ":" + userName + ":" + volumeName - + ":" + blockID + ":" + Time.monotonicNow() / 1000; - } - - /** - * Keeps a Reference counted DB that we close only when the total Reference - * has gone to zero. - */ - private static class RefCountedDB { - private LevelDBStore db; - private AtomicInteger refcount; - private String dbPath; - - /** - * RefCountedDB DB ctor. - * - * @param dbPath - DB path. - * @param db - LevelDBStore db - */ - RefCountedDB(String dbPath, LevelDBStore db) { - this.db = db; - this.refcount = new AtomicInteger(1); - this.dbPath = dbPath; - } - - /** - * close the DB if possible. - */ - public int close() throws IOException { - int count = this.refcount.decrementAndGet(); - if (count == 0) { - LOG.info("Closing the LevelDB. {} ", this.dbPath); - db.close(); - } - return count; - } - - public void open() { - this.refcount.incrementAndGet(); - } - } - - /** - * The message held in processing queue. - */ - private static class Message { - private String dbPath; - private String fileName; - - /** - * A message that holds the info about which path dirty blocks log and - * which path contains db. - * - * @param dbPath - * @param fileName - */ - Message(String dbPath, String fileName) { - this.dbPath = dbPath; - this.fileName = fileName; - } - - public String getDbPath() { - return dbPath; - } - - public void setDbPath(String dbPath) { - this.dbPath = dbPath; - } - - public String getFileName() { - return fileName; - } - - public void setFileName(String fileName) { - this.fileName = fileName; - } - } - - private static class FinishCounter { - private final long expectedCount; - private final String dbPath; - private final String dirtyLogPath; - private final AtomicLong currentCount; - private AtomicBoolean fileDeleted; - private final ContainerCacheFlusher flusher; - - FinishCounter(long expectedCount, String dbPath, - String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException { - this.expectedCount = expectedCount; - this.dbPath = dbPath; - this.dirtyLogPath = dirtyLogPath; - this.currentCount = new AtomicLong(0); - this.fileDeleted = new AtomicBoolean(false); - this.flusher = flusher; - } - - public boolean isFileDeleted() { - return fileDeleted.get(); - } - - public void incCount() { - long count = this.currentCount.incrementAndGet(); - if (count >= expectedCount) { - String filePath = String.format("%s/%s", dbPath, dirtyLogPath); - LOG.debug( - "Deleting {} with count {} {}", filePath, count, expectedCount); - try { - Path path = Paths.get(filePath); - Files.delete(path); - // the following part tries to remove the directory if it is empty - // but not sufficient, because the .db directory still exists.... - // TODO how to handle the .db directory? - /*Path parent = path.getParent(); - if (parent.toFile().listFiles().length == 0) { - Files.delete(parent); - }*/ - fileDeleted.set(true); - } catch (Exception e) { - flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes(); - LOG.error("Error deleting dirty log file:" + filePath, e); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java deleted file mode 100644 index 3806d8b..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_IP; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_PORT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT; - -import org.apache.hadoop.cblock.CblockUtils; -import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.scm.client.ContainerOperationClient; -import org.apache.hadoop.security.UserGroupInformation; -import org.jscsi.target.Configuration; - -import java.net.InetSocketAddress; - -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY; - -/** - * This class runs the target server process. - */ -public final class SCSITargetDaemon { - public static void main(String[] args) throws Exception { - CblockUtils.activateConfigs(); - OzoneConfiguration ozoneConf = new OzoneConfiguration(); - - RPC.setProtocolEngine(ozoneConf, CBlockClientServerProtocolPB.class, - ProtobufRpcEngine.class); - long containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY, - DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT); - ContainerOperationClient.setContainerSizeB( - containerSizeGB * OzoneConsts.GB); - String jscsiServerAddress = ozoneConf.get( - DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY, - DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT); - String cbmIPAddress = ozoneConf.get( - DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY, - DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT - ); - int cbmPort = ozoneConf.getInt( - DFS_CBLOCK_JSCSI_PORT_KEY, - DFS_CBLOCK_JSCSI_PORT_DEFAULT - ); - - String scmAddress = ozoneConf.get(OZONE_SCM_CLIENT_BIND_HOST_KEY, - OZONE_SCM_CLIENT_BIND_HOST_DEFAULT); - int scmClientPort = ozoneConf.getInt(OZONE_SCM_CLIENT_PORT_KEY, - OZONE_SCM_CLIENT_PORT_DEFAULT); - int scmDatanodePort = ozoneConf.getInt(OZONE_SCM_DATANODE_PORT_KEY, - OZONE_SCM_DATANODE_PORT_DEFAULT); - - String scmClientAddress = scmAddress + ":" + scmClientPort; - String scmDataodeAddress = scmAddress + ":" + scmDatanodePort; - - ozoneConf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, scmClientAddress); - ozoneConf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, scmDataodeAddress); - - InetSocketAddress cbmAddress = new InetSocketAddress( - cbmIPAddress, cbmPort); - long version = RPC.getProtocolVersion( - CBlockServiceProtocolPB.class); - CBlockClientProtocolClientSideTranslatorPB cbmClient = - new CBlockClientProtocolClientSideTranslatorPB( - RPC.getProxy(CBlockClientServerProtocolPB.class, version, - cbmAddress, UserGroupInformation.getCurrentUser(), ozoneConf, - NetUtils.getDefaultSocketFactory(ozoneConf), 5000) - ); - CBlockManagerHandler cbmHandler = new CBlockManagerHandler(cbmClient); - - String advertisedAddress = ozoneConf. - getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress); - - int advertisedPort = ozoneConf. - getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT, - DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT); - - Configuration jscsiConfig = - new Configuration(jscsiServerAddress, - advertisedAddress, - advertisedPort); - DefaultMetricsSystem.initialize("CBlockMetrics"); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - CBlockTargetServer targetServer = new CBlockTargetServer( - ozoneConf, jscsiConfig, cbmHandler, metrics); - - targetServer.call(); - } - - private SCSITargetDaemon() { - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java deleted file mode 100644 index 300b2ae..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache; - -import java.io.IOException; - -/** - * Defines the interface for cache implementations. The cache will be called - * by cblock storage module when it performs IO operations. - */ -public interface CacheModule { - /** - * check if the key is cached, if yes, returned the cached object. - * otherwise, load from data source. Then put it into cache. - * - * @param blockID - * @return the target block. - */ - LogicalBlock get(long blockID) throws IOException; - - /** - * put the value of the key into cache. - * @param blockID - * @param value - */ - void put(long blockID, byte[] value) throws IOException; - - void flush() throws IOException; - - void start() throws IOException; - - void stop() throws IOException; - - void close() throws IOException; - - boolean isDirtyCache(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java deleted file mode 100644 index 470826f..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache; - -import java.nio.ByteBuffer; - -/** - * Logical Block is the data structure that we write to the cache, - * the key and data gets written to remote contianers. Rest is used for - * book keeping for the cache. - */ -public interface LogicalBlock { - /** - * Returns the data stream of this block. - * @return - ByteBuffer - */ - ByteBuffer getData(); - - /** - * Frees the byte buffer since we don't need it any more. - */ - void clearData(); - - /** - * Returns the Block ID for this Block. - * @return long - BlockID - */ - long getBlockID(); - - /** - * Flag that tells us if this block has been persisted to container. - * @return whether this block is now persistent - */ - boolean isPersisted(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java deleted file mode 100644 index 0192c38..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.LevelDBStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * A Queue that is used to write blocks asynchronously to the container. - */ -public class AsyncBlockWriter { - private static final Logger LOG = - LoggerFactory.getLogger(AsyncBlockWriter.class); - - /** - * XceiverClientManager is used to get client connections to a set of - * machines. - */ - private final XceiverClientManager xceiverClientManager; - - /** - * This lock is used as a signal to re-queuing thread. The requeue thread - * wakes up as soon as it is signaled some blocks are in the retry queue. - * We try really aggressively since this new block will automatically move - * to the end of the queue. - * <p> - * In the event a container is unavailable for a long time, we can either - * fail all writes or remap and let the writes succeed. The easier - * semantics is to fail the volume until the container is recovered by SCM. - */ - private final Lock lock; - private final Condition notEmpty; - /** - * The cache this writer is operating against. - */ - private final CBlockLocalCache parentCache; - private final BlockBufferManager blockBufferManager; - public final static String DIRTY_LOG_PREFIX = "DirtyLog"; - public static final String RETRY_LOG_PREFIX = "RetryLog"; - private AtomicLong localIoCount; - - /** - * Constructs an Async Block Writer. - * - * @param config - Config - * @param cache - Parent Cache for this writer - */ - public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) { - - Preconditions.checkNotNull(cache, "Cache cannot be null."); - Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null."); - localIoCount = new AtomicLong(); - lock = new ReentrantLock(); - notEmpty = lock.newCondition(); - parentCache = cache; - xceiverClientManager = cache.getClientManager(); - blockBufferManager = new BlockBufferManager(config, parentCache); - } - - public void start() throws IOException { - File logDir = new File(parentCache.getDbPath().toString()); - if (!logDir.exists() && !logDir.mkdirs()) { - LOG.error("Unable to create the log directory, Critical error cannot " + - "continue. Log Dir : {}", logDir); - throw new IllegalStateException("Cache Directory create failed, Cannot " + - "continue. Log Dir: {}" + logDir); - } - blockBufferManager.start(); - } - - /** - * Return the log to write to. - * - * @return Logger. - */ - public static Logger getLOG() { - return LOG; - } - - /** - * Get the CacheDB. - * - * @return LevelDB Handle - */ - LevelDBStore getCacheDB() { - return parentCache.getCacheDB(); - } - - /** - * Returns the client manager. - * - * @return XceiverClientManager - */ - XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; - } - - /** - * Incs the localIoPacket Count that has gone into this device. - */ - public long incrementLocalIO() { - return localIoCount.incrementAndGet(); - } - - /** - * Return the local io counts to this device. - * @return the count of io - */ - public long getLocalIOCount() { - return localIoCount.get(); - } - - /** - * Writes a block to LevelDB store and queues a work item for the system to - * sync the block to containers. - * - * @param block - Logical Block - */ - public void writeBlock(LogicalBlock block) throws IOException { - byte[] keybuf = Longs.toByteArray(block.getBlockID()); - String traceID = parentCache.getTraceID(block.getBlockID()); - if (parentCache.isShortCircuitIOEnabled()) { - long startTime = Time.monotonicNow(); - getCacheDB().put(keybuf, block.getData().array()); - incrementLocalIO(); - long endTime = Time.monotonicNow(); - parentCache.getTargetMetrics().updateDBWriteLatency( - endTime - startTime); - if (parentCache.isTraceEnabled()) { - String datahash = DigestUtils.sha256Hex(block.getData().array()); - parentCache.getTracer().info( - "Task=WriterTaskDBPut,BlockID={},Time={},SHA={}", - block.getBlockID(), endTime - startTime, datahash); - } - block.clearData(); - blockBufferManager.addToBlockBuffer(block.getBlockID()); - } else { - Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); - String containerName = pipeline.getContainerName(); - XceiverClientSpi client = null; - try { - long startTime = Time.monotonicNow(); - client = parentCache.getClientManager() - .acquireClient(parentCache.getPipeline(block.getBlockID())); - ContainerProtocolCalls.writeSmallFile(client, containerName, - Long.toString(block.getBlockID()), block.getData().array(), - traceID); - long endTime = Time.monotonicNow(); - if (parentCache.isTraceEnabled()) { - String datahash = DigestUtils.sha256Hex(block.getData().array()); - parentCache.getTracer().info( - "Task=DirectWriterPut,BlockID={},Time={},SHA={}", - block.getBlockID(), endTime - startTime, datahash); - } - parentCache.getTargetMetrics(). - updateDirectBlockWriteLatency(endTime - startTime); - parentCache.getTargetMetrics().incNumDirectBlockWrites(); - } catch (Exception ex) { - parentCache.getTargetMetrics().incNumFailedDirectBlockWrites(); - LOG.error("Direct I/O writing of block:{} traceID:{} to " - + "container {} failed", block.getBlockID(), traceID, - containerName, ex); - throw ex; - } finally { - if (client != null) { - parentCache.getClientManager().releaseClient(client); - } - block.clearData(); - } - } - } - - /** - * Shutdown by writing any pending I/O to dirtylog buffer. - */ - public void shutdown() { - blockBufferManager.shutdown(); - } - /** - * Returns tracer. - * - * @return Tracer - */ - Logger getTracer() { - return parentCache.getTracer(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java deleted file mode 100644 index c61a7a4..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; - -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Paths; - -/** - * This task is responsible for flushing the BlockIDBuffer - * to Dirty Log File. This Dirty Log file is used later by - * ContainerCacheFlusher when the data is written to container - */ -public class BlockBufferFlushTask implements Runnable { - private static final Logger LOG = - LoggerFactory.getLogger(BlockBufferFlushTask.class); - private final CBlockLocalCache parentCache; - private final BlockBufferManager bufferManager; - private final ByteBuffer blockIDBuffer; - - BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache, - BlockBufferManager manager) { - this.parentCache = parentCache; - this.bufferManager = manager; - this.blockIDBuffer = blockIDBuffer; - } - - /** - * When an object implementing interface <code>Runnable</code> is used - * to create a thread, starting the thread causes the object's - * <code>run</code> method to be called in that separately executing - * thread. - * <p> - * The general contract of the method <code>run</code> is that it may - * take any action whatsoever. - * - * @see Thread#run() - */ - @Override - public void run() { - try { - writeBlockBufferToFile(blockIDBuffer); - } catch (Exception e) { - parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes(); - LOG.error("Unable to sync the Block map to disk with " - + (blockIDBuffer.position() / Long.SIZE) + "entries " - + "-- NOTE: This might cause a data loss or corruption", e); - } finally { - bufferManager.releaseBuffer(blockIDBuffer); - } - } - - /** - * Write Block Buffer to file. - * - * @param buffer - ByteBuffer - * @throws IOException - */ - private void writeBlockBufferToFile(ByteBuffer buffer) - throws IOException { - long startTime = Time.monotonicNow(); - boolean append = false; - - // If there is nothing written to blockId buffer, - // then skip flushing of blockId buffer - if (buffer.position() == 0) { - return; - } - - buffer.flip(); - String fileName = - String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX, - Time.monotonicNow()); - String log = Paths.get(parentCache.getDbPath().toString(), fileName) - .toString(); - - FileChannel channel = new FileOutputStream(log, append).getChannel(); - int bytesWritten = channel.write(buffer); - channel.close(); - buffer.clear(); - parentCache.processDirtyMessage(fileName); - long endTime = Time.monotonicNow(); - if (parentCache.isTraceEnabled()) { - parentCache.getTracer().info( - "Task=DirtyBlockLogWrite,Time={} bytesWritten={}", - endTime - startTime, bytesWritten); - } - - parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted(); - parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten); - parentCache.getTargetMetrics(). - updateBlockBufferFlushLatency(endTime - startTime); - LOG.debug("Block buffer writer bytesWritten:{} Time:{}", - bytesWritten, endTime - startTime); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java deleted file mode 100644 index 5d3209c..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_KEEP_ALIVE; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_THREAD_PRIORITY; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; - -/** - * This class manages the block ID buffer. - * Block ID Buffer keeps a list of blocks which are in leveldb cache - * This buffer is used later when the blocks are flushed to container - * - * Two blockIDBuffers are maintained so that write are not blocked when - * DirtyLog is being written. Once a blockIDBuffer is full, it will be - * enqueued for DirtyLog write while the other buffer accepts new write. - * Once the DirtyLog write is done, the buffer is returned back to the pool. - * - * There are three triggers for blockIDBuffer flush - * 1) BlockIDBuffer is full, - * 2) Time period defined for blockIDBuffer flush has elapsed. - * 3) Shutdown - */ -public class BlockBufferManager { - private static final Logger LOG = - LoggerFactory.getLogger(BlockBufferManager.class); - - private enum FlushReason { - BUFFER_FULL, - SHUTDOWN, - TIMER - }; - - private final int blockBufferSize; - private final CBlockLocalCache parentCache; - private final ScheduledThreadPoolExecutor scheduledExecutor; - private final ThreadPoolExecutor threadPoolExecutor; - private final long intervalSeconds; - private final ArrayBlockingQueue<ByteBuffer> acquireQueue; - private final ArrayBlockingQueue<Runnable> workQueue; - private ByteBuffer currentBuffer; - - BlockBufferManager(Configuration config, CBlockLocalCache parentCache) { - this.parentCache = parentCache; - this.scheduledExecutor = new ScheduledThreadPoolExecutor(1); - - this.intervalSeconds = - config.getTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT, - TimeUnit.SECONDS); - - long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE, - DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT, - TimeUnit.SECONDS); - this.workQueue = new ArrayBlockingQueue<>(2, true); - int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY, - DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); - ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("Cache Block Buffer Manager Thread #%d") - .setDaemon(true) - .setPriority(threadPri) - .build(); - /* - * starting a thread pool with core pool size of 1 and maximum of 2 threads - * as there are maximum of 2 buffers which can be flushed at the same time. - */ - this.threadPoolExecutor = new ThreadPoolExecutor(1, 2, - keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory, - new ThreadPoolExecutor.AbortPolicy()); - - this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); - this.acquireQueue = new ArrayBlockingQueue<>(2, true); - - for (int i = 0; i < 2; i++) { - acquireQueue.add(ByteBuffer.allocate(blockBufferSize)); - } - // get the first buffer to be used - this.currentBuffer = acquireQueue.remove(); - - LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}", - blockBufferSize, intervalSeconds); - } - - // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns. - // This enqueue is asynchronous and hence triggerBlockBufferFlush will - // only block when there are no available buffers in acquireQueue - // Once the DirtyLog write is done, buffer is returned back to - // BlockBufferManager using releaseBuffer - private synchronized void triggerBlockBufferFlush(FlushReason reason) { - LOG.debug("Flush triggered because: " + reason.toString() + - " Num entries in buffer: " + - currentBuffer.position() / (Long.SIZE / Byte.SIZE) + - " Acquire Queue Size: " + acquireQueue.size()); - - parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered(); - BlockBufferFlushTask flushTask = - new BlockBufferFlushTask(currentBuffer, parentCache, this); - threadPoolExecutor.submit(flushTask); - try { - currentBuffer = acquireQueue.take(); - } catch (InterruptedException ex) { - currentBuffer = null; - parentCache.getTargetMetrics().incNumInterruptedBufferWaits(); - LOG.error("wait on take operation on acquire queue interrupted", ex); - Thread.currentThread().interrupt(); - } - } - - public synchronized void addToBlockBuffer(long blockId) { - parentCache.getTargetMetrics().incNumBlockBufferUpdates(); - currentBuffer.putLong(blockId); - // if no space left, flush this buffer - if (currentBuffer.remaining() == 0) { - triggerBlockBufferFlush(FlushReason.BUFFER_FULL); - } - } - - public void releaseBuffer(ByteBuffer buffer) { - if (buffer.position() != 0) { - LOG.error("requeuing a non empty buffer with:{}", - "elements enqueued in the acquire queue", - buffer.position() / (Long.SIZE / Byte.SIZE)); - buffer.reset(); - } - // There should always be space in the queue to add an element - acquireQueue.add(buffer); - } - - // Start a scheduled task to flush blockIDBuffer - public void start() { - Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER); - scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds, - intervalSeconds, TimeUnit.SECONDS); - threadPoolExecutor.prestartAllCoreThreads(); - } - - public void shutdown() { - triggerBlockBufferFlush(FlushReason.SHUTDOWN); - scheduledExecutor.shutdown(); - threadPoolExecutor.shutdown(); - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org