http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
deleted file mode 100644
index 4744968..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
-import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
-import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.jscsi.target.storage.IStorageModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_TRACE_IO_DEFAULT;
-
-/**
- * The SCSI Target class for CBlockSCSIServer.
- */
-final public class CBlockIStorageImpl implements IStorageModule {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(CBlockIStorageImpl.class);
-  private static final Logger TRACER =
-      LoggerFactory.getLogger("TraceIO");
-
-  private CacheModule cache;
-  private final long volumeSize;
-  private final int blockSize;
-  private final String userName;
-  private final String volumeName;
-  private final boolean traceEnabled;
-  private final Configuration conf;
-  private final ContainerCacheFlusher flusher;
-  private List<Pipeline> fullContainerList;
-
-  /**
-   * private: constructs a SCSI Target.
-   *
-   * @param config - config
-   * @param userName - Username
-   * @param volumeName - Name of the volume
-   * @param volumeSize - Size of the volume
-   * @param blockSize - Size of the block
-   * @param fullContainerList - Ordered list of containers that make up this
-   * volume.
-   * @param flusher - flusher which is used to flush data from
-   *                  level db cache to containers
-   * @throws IOException - Throws IOException.
-   */
-  private CBlockIStorageImpl(Configuration config, String userName,
-      String volumeName, long volumeSize, int blockSize,
-      List<Pipeline> fullContainerList, ContainerCacheFlusher flusher) {
-    this.conf = config;
-    this.userName = userName;
-    this.volumeName = volumeName;
-    this.volumeSize = volumeSize;
-    this.blockSize = blockSize;
-    this.fullContainerList = new ArrayList<>(fullContainerList);
-    this.flusher = flusher;
-    this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO,
-        DFS_CBLOCK_TRACE_IO_DEFAULT);
-  }
-
-  /**
-   * private: initialize the cache.
-   *
-   * @param xceiverClientManager - client manager that is used for creating new
-   * connections to containers.
-   * @param metrics  - target metrics to maintain metrics for target server
-   * @throws IOException - Throws IOException.
-   */
-  private void initCache(XceiverClientManager xceiverClientManager,
-      CBlockTargetMetrics metrics) throws IOException {
-    this.cache = CBlockLocalCache.newBuilder()
-        .setConfiguration(conf)
-        .setVolumeName(this.volumeName)
-        .setUserName(this.userName)
-        .setPipelines(this.fullContainerList)
-        .setClientManager(xceiverClientManager)
-        .setBlockSize(blockSize)
-        .setVolumeSize(volumeSize)
-        .setFlusher(flusher)
-        .setCBlockTargetMetrics(metrics)
-        .build();
-    this.cache.start();
-  }
-
-  /**
-   * Gets a new builder for CBlockStorageImpl.
-   *
-   * @return builder
-   */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Get Cache.
-   *
-   * @return - Cache
-   */
-  public CacheModule getCache() {
-    return cache;
-  }
-
-  /**
-   * Returns block size of this volume.
-   *
-   * @return int size of block for this volume.
-   */
-  @Override
-  public int getBlockSize() {
-    return blockSize;
-  }
-
-  /**
-   * Checks the index boundary of a block address.
-   *
-   * @param logicalBlockAddress the index of the first block of data to be read
-   * or written
-   * @param transferLengthInBlocks the total number of consecutive blocks about
-   * to be read or written
-   * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2
-   * indicates that LBA + transfer size is out of bounds.
-   */
-  @Override
-  public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) 
{
-    long sizeInBlocks = volumeSize / blockSize;
-    int res = 0;
-    if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) {
-      res = 1;
-    }
-
-    if (transferLengthInBlocks < 0 ||
-        logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) {
-      if (res == 0) {
-        res = 2;
-      }
-    }
-    return res;
-  }
-
-  /**
-   * Number of blocks that make up this volume.
-   *
-   * @return long - count of blocks.
-   */
-  @Override
-  public long getSizeInBlocks() {
-    return volumeSize / blockSize;
-  }
-
-  /**
-   * Reads the number of bytes that can be read into the bytes buffer from the
-   * location indicated.
-   *
-   * @param bytes the array into which the data will be copied will be filled
-   * with data from storage
-   * @param storageIndex the position of the first byte to be copied
-   * @throws IOException
-   */
-  @Override
-  public void read(byte[] bytes, long storageIndex) throws IOException {
-    int startingIdxInBlock = (int) storageIndex % blockSize;
-    int idxInBytes = 0;
-    if (this.traceEnabled) {
-      TRACER.info("Task=ReadStart,length={},location={}",
-          bytes.length, storageIndex);
-    }
-    while (idxInBytes < bytes.length - 1) {
-      long blockId = (storageIndex + idxInBytes) / blockSize;
-      byte[] dataBytes;
-
-      try {
-        LogicalBlock block = this.cache.get(blockId);
-        dataBytes = block.getData().array();
-
-        if (this.traceEnabled) {
-          TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}",
-              blockId,
-              dataBytes.length,
-              dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null);
-        }
-      } catch (IOException e) {
-        // For an non-existing block cache.get will return a block with zero
-        // bytes filled. So any error here is a real error.
-        LOGGER.error("getting errors when reading data:" + e);
-        throw e;
-      }
-
-      int length = blockSize - startingIdxInBlock;
-      if (length > bytes.length - idxInBytes) {
-        length = bytes.length - idxInBytes;
-      }
-      if (dataBytes.length >= length) {
-        System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes,
-            length);
-      }
-      startingIdxInBlock = 0;
-      idxInBytes += length;
-    }
-    if (this.traceEnabled) {
-      TRACER.info("Task=ReadEnd,length={},location={},SHA={}",
-          bytes.length, storageIndex, DigestUtils.sha256Hex(bytes));
-    }
-  }
-
-  @Override
-  public void write(byte[] bytes, long storageIndex) throws IOException {
-    int startingIdxInBlock = (int) storageIndex % blockSize;
-    int idxInBytes = 0;
-    if (this.traceEnabled) {
-      TRACER.info("Task=WriteStart,length={},location={},SHA={}",
-          bytes.length, storageIndex,
-          bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null);
-    }
-
-    ByteBuffer dataByte = ByteBuffer.allocate(blockSize);
-    while (idxInBytes < bytes.length - 1) {
-      long blockId = (storageIndex + idxInBytes) / blockSize;
-      int length = blockSize - startingIdxInBlock;
-      if (length > bytes.length - idxInBytes) {
-        length = bytes.length - idxInBytes;
-      }
-      System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock,
-          length);
-      this.cache.put(blockId, dataByte.array());
-
-      if (this.traceEnabled) {
-        TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}",
-            blockId, dataByte.array().length,
-            dataByte.array().length > 0 ?
-                DigestUtils.sha256Hex(dataByte.array()) : null);
-      }
-      dataByte.clear();
-      startingIdxInBlock = 0;
-      idxInBytes += length;
-    }
-
-    if (this.traceEnabled) {
-      TRACER.info("Task=WriteEnd,length={},location={} ",
-          bytes.length, storageIndex);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      cache.close();
-    } catch (IllegalStateException ise) {
-      LOGGER.error("Can not close the storage {}", ise);
-      throw ise;
-    }
-  }
-
-  /**
-   * Builder class for CBlocklocalCache.
-   */
-  public static class Builder {
-    private String userName;
-    private String volumeName;
-    private long volumeSize;
-    private int blockSize;
-    private List<Pipeline> containerList;
-    private Configuration conf;
-    private XceiverClientManager clientManager;
-    private ContainerCacheFlusher flusher;
-    private CBlockTargetMetrics metrics;
-
-    /**
-     * Constructs a builder.
-     */
-    Builder() {
-
-    }
-
-    public Builder setFlusher(ContainerCacheFlusher cacheFlusher) {
-      this.flusher = cacheFlusher;
-      return this;
-    }
-
-    /**
-     * set config.
-     *
-     * @param config - config
-     * @return Builder
-     */
-    public Builder setConf(Configuration config) {
-      this.conf = config;
-      return this;
-    }
-
-    /**
-     * set user name.
-     *
-     * @param cblockUserName - user name
-     * @return Builder
-     */
-    public Builder setUserName(String cblockUserName) {
-      this.userName = cblockUserName;
-      return this;
-    }
-
-    /**
-     * set volume name.
-     *
-     * @param cblockVolumeName -- volume name
-     * @return Builder
-     */
-    public Builder setVolumeName(String cblockVolumeName) {
-      this.volumeName = cblockVolumeName;
-      return this;
-    }
-
-    /**
-     * set volume size.
-     *
-     * @param cblockVolumeSize -- set volume size.
-     * @return Builder
-     */
-    public Builder setVolumeSize(long cblockVolumeSize) {
-      this.volumeSize = cblockVolumeSize;
-      return this;
-    }
-
-    /**
-     * set block size.
-     *
-     * @param cblockBlockSize -- block size
-     * @return Builder
-     */
-    public Builder setBlockSize(int cblockBlockSize) {
-      this.blockSize = cblockBlockSize;
-      return this;
-    }
-
-    /**
-     * Set contianer list.
-     *
-     * @param cblockContainerList - set the pipeline list
-     * @return Builder
-     */
-    public Builder setContainerList(List<Pipeline> cblockContainerList) {
-      this.containerList = cblockContainerList;
-      return this;
-    }
-
-    /**
-     * Set client manager.
-     *
-     * @param xceiverClientManager -- sets the client manager.
-     * @return Builder
-     */
-    public Builder setClientManager(XceiverClientManager xceiverClientManager) 
{
-      this.clientManager = xceiverClientManager;
-      return this;
-    }
-
-    /**
-     * Set Cblock Target Metrics.
-     *
-     * @param targetMetrics -- sets the cblock target metrics
-     * @return Builder
-     */
-    public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
-      this.metrics = targetMetrics;
-      return this;
-    }
-
-    /**
-     * Builds the CBlockStorageImpl.
-     *
-     * @return builds the CBlock Scsi Target.
-     */
-    public CBlockIStorageImpl build() throws IOException {
-      if (StringUtils.isBlank(userName)) {
-        throw new IllegalArgumentException("User name cannot be null or empty" 
+
-            ".");
-      }
-      if (StringUtils.isBlank(volumeName)) {
-        throw new IllegalArgumentException("Volume name cannot be null or " +
-            "empty");
-      }
-
-      if (volumeSize < 1) {
-        throw new IllegalArgumentException("Volume size cannot be negative or" 
+
-            " zero.");
-      }
-
-      if (blockSize < 1) {
-        throw new IllegalArgumentException("Block size cannot be negative or " 
+
-            "zero.");
-      }
-
-      if (containerList == null || containerList.size() == 0) {
-        throw new IllegalArgumentException("Container list cannot be null or " 
+
-            "empty");
-      }
-      if (clientManager == null) {
-        throw new IllegalArgumentException("Client manager cannot be null");
-      }
-      if (conf == null) {
-        throw new IllegalArgumentException("Configuration cannot be null");
-      }
-
-      if (flusher == null) {
-        throw new IllegalArgumentException("Flusher Cannot be null.");
-      }
-      CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, 
this.userName,
-          this.volumeName, this.volumeSize, this.blockSize, this.containerList,
-          this.flusher);
-      impl.initCache(this.clientManager, this.metrics);
-      return impl;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
deleted file mode 100644
index 6367c61..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper;
-
-import org.apache.hadoop.cblock.meta.VolumeInfo;
-import org.apache.hadoop.cblock.proto.MountVolumeResponse;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * This class is the handler of CBlockManager used by target server
- * to communicate with CBlockManager.
- *
- * More specifically, this class will expose local methods to target
- * server, and make RPC calls to CBlockManager accordingly
- */
-public class CBlockManagerHandler {
-
-  private final CBlockClientProtocolClientSideTranslatorPB handler;
-
-  public CBlockManagerHandler(
-      CBlockClientProtocolClientSideTranslatorPB handler) {
-    this.handler = handler;
-  }
-
-  public MountVolumeResponse mountVolume(
-      String userName, String volumeName) throws IOException {
-    return handler.mountVolume(userName, volumeName);
-  }
-
-  public List<VolumeInfo> listVolumes() throws IOException {
-    return handler.listVolumes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
deleted file mode 100644
index e7df0cf..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.cblock.jscsiHelper;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
-
-/**
- * This class is for maintaining  the various Cblock Target statistics
- * and publishing them through the metrics interfaces.
- * This also registers the JMX MBean for RPC.
- *
- * This class maintains stats like cache hit and miss ratio
- * as well as the latency time of read and write ops.
- */
-public class CBlockTargetMetrics {
-  // IOPS based Metrics
-  @Metric private MutableCounterLong numReadOps;
-  @Metric private MutableCounterLong numWriteOps;
-  @Metric private MutableCounterLong numReadCacheHits;
-  @Metric private MutableCounterLong numReadCacheMiss;
-  @Metric private MutableCounterLong numDirectBlockWrites;
-
-  // Cblock internal Metrics
-  @Metric private MutableCounterLong numDirtyLogBlockRead;
-  @Metric private MutableCounterLong numBytesDirtyLogRead;
-  @Metric private MutableCounterLong numBytesDirtyLogWritten;
-  @Metric private MutableCounterLong numBlockBufferFlushCompleted;
-  @Metric private MutableCounterLong numBlockBufferFlushTriggered;
-  @Metric private MutableCounterLong numBlockBufferUpdates;
-  @Metric private MutableCounterLong numRetryLogBlockRead;
-  @Metric private MutableCounterLong numBytesRetryLogRead;
-
-  // Failure Metrics
-  @Metric private MutableCounterLong numReadLostBlocks;
-  @Metric private MutableCounterLong numFailedReadBlocks;
-  @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
-  @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
-  @Metric private MutableCounterLong numFailedDirectBlockWrites;
-  @Metric private MutableCounterLong numIllegalDirtyLogFiles;
-  @Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
-  @Metric private MutableCounterLong numFailedBlockBufferFlushes;
-  @Metric private MutableCounterLong numInterruptedBufferWaits;
-  @Metric private MutableCounterLong numFailedRetryLogFileWrites;
-  @Metric private MutableCounterLong numWriteMaxRetryBlocks;
-  @Metric private MutableCounterLong numFailedReleaseLevelDB;
-
-  // Latency based Metrics
-  @Metric private MutableRate dbReadLatency;
-  @Metric private MutableRate containerReadLatency;
-  @Metric private MutableRate dbWriteLatency;
-  @Metric private MutableRate containerWriteLatency;
-  @Metric private MutableRate blockBufferFlushLatency;
-  @Metric private MutableRate directBlockWriteLatency;
-
-  public CBlockTargetMetrics() {
-  }
-
-  public static CBlockTargetMetrics create() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register("CBlockTargetMetrics",
-        "CBlock Target Metrics",
-        new CBlockTargetMetrics());
-  }
-
-  public void incNumReadOps() {
-    numReadOps.incr();
-  }
-
-  public void incNumWriteOps() {
-    numWriteOps.incr();
-  }
-
-  public void incNumReadCacheHits() {
-    numReadCacheHits.incr();
-  }
-
-  public void incNumReadCacheMiss() {
-    numReadCacheMiss.incr();
-  }
-
-  public void incNumReadLostBlocks() {
-    numReadLostBlocks.incr();
-  }
-
-  public void incNumDirectBlockWrites() {
-    numDirectBlockWrites.incr();
-  }
-
-  public void incNumWriteIOExceptionRetryBlocks() {
-    numWriteIOExceptionRetryBlocks.incr();
-  }
-
-  public void incNumWriteGenericExceptionRetryBlocks() {
-    numWriteGenericExceptionRetryBlocks.incr();
-  }
-
-  public void incNumFailedDirectBlockWrites() {
-    numFailedDirectBlockWrites.incr();
-  }
-
-  public void incNumFailedReadBlocks() {
-    numFailedReadBlocks.incr();
-  }
-
-  public void incNumBlockBufferFlushCompleted() {
-    numBlockBufferFlushCompleted.incr();
-  }
-
-  public void incNumBlockBufferFlushTriggered() {
-    numBlockBufferFlushTriggered.incr();
-  }
-
-  public void incNumDirtyLogBlockRead() {
-    numDirtyLogBlockRead.incr();
-  }
-
-  public void incNumBytesDirtyLogRead(int bytes) {
-    numBytesDirtyLogRead.incr(bytes);
-  }
-
-  public void incNumBlockBufferUpdates() {
-    numBlockBufferUpdates.incr();
-  }
-
-  public void incNumRetryLogBlockRead() {
-    numRetryLogBlockRead.incr();
-  }
-
-  public void incNumBytesRetryLogRead(int bytes) {
-    numBytesRetryLogRead.incr(bytes);
-  }
-
-  public void incNumBytesDirtyLogWritten(int bytes) {
-    numBytesDirtyLogWritten.incr(bytes);
-  }
-
-  public void incNumFailedBlockBufferFlushes() {
-    numFailedBlockBufferFlushes.incr();
-  }
-
-  public void incNumInterruptedBufferWaits() {
-    numInterruptedBufferWaits.incr();
-  }
-
-  public void incNumIllegalDirtyLogFiles() {
-    numIllegalDirtyLogFiles.incr();
-  }
-
-  public void incNumFailedDirtyLogFileDeletes() {
-    numFailedDirtyLogFileDeletes.incr();
-  }
-
-  public void incNumFailedRetryLogFileWrites() {
-    numFailedRetryLogFileWrites.incr();
-  }
-
-  public void incNumWriteMaxRetryBlocks() {
-    numWriteMaxRetryBlocks.incr();
-  }
-
-  public void incNumFailedReleaseLevelDB() {
-    numFailedReleaseLevelDB.incr();
-  }
-
-  public void updateDBReadLatency(long latency) {
-    dbReadLatency.add(latency);
-  }
-
-  public void updateContainerReadLatency(long latency) {
-    containerReadLatency.add(latency);
-  }
-
-  public void updateDBWriteLatency(long latency) {
-    dbWriteLatency.add(latency);
-  }
-
-  public void updateContainerWriteLatency(long latency) {
-    containerWriteLatency.add(latency);
-  }
-
-  public void updateDirectBlockWriteLatency(long latency) {
-    directBlockWriteLatency.add(latency);
-  }
-
-  public void updateBlockBufferFlushLatency(long latency) {
-    blockBufferFlushLatency.add(latency);
-  }
-
-  @VisibleForTesting
-  public long getNumReadOps() {
-    return numReadOps.value();
-  }
-
-  @VisibleForTesting
-  public long getNumWriteOps() {
-    return numWriteOps.value();
-  }
-
-  @VisibleForTesting
-  public long getNumReadCacheHits() {
-    return numReadCacheHits.value();
-  }
-
-  @VisibleForTesting
-  public long getNumReadCacheMiss() {
-    return numReadCacheMiss.value();
-  }
-
-  @VisibleForTesting
-  public long getNumReadLostBlocks() {
-    return numReadLostBlocks.value();
-  }
-
-  @VisibleForTesting
-  public long getNumDirectBlockWrites() {
-    return numDirectBlockWrites.value();
-  }
-
-  @VisibleForTesting
-  public long getNumFailedDirectBlockWrites() {
-    return numFailedDirectBlockWrites.value();
-  }
-
-  @VisibleForTesting
-  public long getNumFailedReadBlocks() {
-    return numFailedReadBlocks.value();
-  }
-
-  @VisibleForTesting
-  public long getNumWriteIOExceptionRetryBlocks() {
-    return numWriteIOExceptionRetryBlocks.value();
-  }
-
-  @VisibleForTesting
-  public long getNumWriteGenericExceptionRetryBlocks() {
-    return numWriteGenericExceptionRetryBlocks.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockBufferFlushCompleted() {
-    return numBlockBufferFlushCompleted.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockBufferFlushTriggered() {
-    return numBlockBufferFlushTriggered.value();
-  }
-
-  @VisibleForTesting
-  public long getNumDirtyLogBlockRead() {
-    return numDirtyLogBlockRead.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBytesDirtyLogReads() {
-    return numBytesDirtyLogRead.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBlockBufferUpdates() {
-    return numBlockBufferUpdates.value();
-  }
-
-  @VisibleForTesting
-  public long getNumRetryLogBlockRead() {
-    return numRetryLogBlockRead.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBytesRetryLogReads() {
-    return numBytesRetryLogRead.value();
-  }
-
-  @VisibleForTesting
-  public long getNumBytesDirtyLogWritten() {
-    return numBytesDirtyLogWritten.value();
-  }
-
-  @VisibleForTesting
-  public long getNumFailedBlockBufferFlushes() {
-    return numFailedBlockBufferFlushes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumInterruptedBufferWaits() {
-    return numInterruptedBufferWaits.value();
-  }
-
-  @VisibleForTesting
-  public long getNumIllegalDirtyLogFiles() {
-    return numIllegalDirtyLogFiles.value();
-  }
-
-  @VisibleForTesting
-  public long getNumFailedDirtyLogFileDeletes() {
-    return numFailedDirtyLogFileDeletes.value();
-  }
-
-  @VisibleForTesting
-  public long getNumFailedRetryLogFileWrites() {
-    return numFailedRetryLogFileWrites.value();
-  }
-
-  @VisibleForTesting
-  public long getNumWriteMaxRetryBlocks() {
-    return numWriteMaxRetryBlocks.value();
-  }
-
-  @VisibleForTesting
-  public long getNumFailedReleaseLevelDB() {
-    return numFailedReleaseLevelDB.value();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
deleted file mode 100644
index afbd260..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.cblock.proto.MountVolumeResponse;
-import org.apache.hadoop.cblock.util.KeyUtil;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.jscsi.target.Configuration;
-import org.jscsi.target.Target;
-import org.jscsi.target.TargetServer;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * This class extends JSCSI target server, which is a ISCSI target that can be
- * recognized by a remote machine with ISCSI installed.
- */
-public final class CBlockTargetServer extends TargetServer {
-  private final OzoneConfiguration conf;
-  private final CBlockManagerHandler cBlockManagerHandler;
-  private final XceiverClientManager xceiverClientManager;
-  private final ContainerCacheFlusher containerCacheFlusher;
-  private final CBlockTargetMetrics metrics;
-
-  public CBlockTargetServer(OzoneConfiguration ozoneConfig,
-      Configuration jscsiConf,
-      CBlockManagerHandler cBlockManagerHandler,
-      CBlockTargetMetrics metrics)
-      throws IOException {
-    super(jscsiConf);
-    this.cBlockManagerHandler = cBlockManagerHandler;
-    this.xceiverClientManager = new XceiverClientManager(ozoneConfig);
-    this.conf = ozoneConfig;
-    this.containerCacheFlusher = new ContainerCacheFlusher(this.conf,
-        xceiverClientManager, metrics);
-    this.metrics = metrics;
-    LOGGER.info("Starting flusher thread.");
-    Thread flushListenerThread = new Thread(containerCacheFlusher);
-    flushListenerThread.setDaemon(true);
-    flushListenerThread.start();
-  }
-
-  public static void main(String[] args) throws Exception {
-  }
-
-  @Override
-  public boolean isValidTargetName(String checkTargetName) {
-    if (!KeyUtil.isValidVolumeKey(checkTargetName)) {
-      return false;
-    }
-    String userName = KeyUtil.getUserNameFromVolumeKey(checkTargetName);
-    String volumeName = KeyUtil.getVolumeFromVolumeKey(checkTargetName);
-    if (userName == null || volumeName == null) {
-      return false;
-    }
-    try {
-      MountVolumeResponse result =
-          cBlockManagerHandler.mountVolume(userName, volumeName);
-      if (!result.getIsValid()) {
-        LOGGER.error("Not a valid volume:" + checkTargetName);
-        return false;
-      }
-      String volumeKey = KeyUtil.getVolumeKey(result.getUserName(),
-          result.getVolumeName());
-      if (!targets.containsKey(volumeKey)) {
-        LOGGER.info("Mounting Volume. username: {} volume:{}",
-            userName, volumeName);
-        CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
-            .setUserName(userName)
-            .setVolumeName(volumeName)
-            .setVolumeSize(result.getVolumeSize())
-            .setBlockSize(result.getBlockSize())
-            .setContainerList(result.getContainerList())
-            .setClientManager(xceiverClientManager)
-            .setConf(this.conf)
-            .setFlusher(containerCacheFlusher)
-            .setCBlockTargetMetrics(metrics)
-            .build();
-        Target target = new Target(volumeKey, volumeKey, ozoneStore);
-        targets.put(volumeKey, target);
-      }
-    } catch (IOException e) {
-      LOGGER.error("Can not connect to server when validating target!"
-          + e.getMessage());
-    }
-    return targets.containsKey(checkTargetName);
-  }
-
-  @Override
-  public String[] getTargetNames() {
-    try {
-      if (cBlockManagerHandler != null) {
-        return cBlockManagerHandler.listVolumes().
-            stream().map(
-              volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo
-                .getVolumeName()).toArray(String[]::new);
-      } else {
-        return new String[0];
-      }
-    } catch (IOException e) {
-      LOGGER.error("Can't list existing volumes", e);
-      return new String[0];
-    }
-  }
-
-  @VisibleForTesting
-  public HashMap<String, Target> getTargets() {
-    return targets;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
deleted file mode 100644
index 171f3e2..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.cblock.CBlockConfigKeys;
-import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
-import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter;
-import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.Options;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_KEEP_ALIVE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_MAX_POOL_SIZE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_THREAD_PRIORITY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT;
-
-/**
- * Class that writes to remote containers.
- */
-public class ContainerCacheFlusher implements Runnable {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerCacheFlusher.class);
-  private final LinkedBlockingQueue<Message> messageQueue;
-  private final ThreadPoolExecutor threadPoolExecutor;
-  private final ArrayBlockingQueue<Runnable> workQueue;
-  private final ConcurrentMap<String, RefCountedDB> dbMap;
-  private final ByteBuffer blockIDBuffer;
-  private final ConcurrentMap<String, Pipeline[]> pipelineMap;
-  private final AtomicLong remoteIO;
-  private final XceiverClientManager xceiverClientManager;
-  private final CBlockTargetMetrics metrics;
-  private AtomicBoolean shutdown;
-  private final long levelDBCacheSize;
-  private final int maxRetryCount;
-  private final String tracePrefix;
-
-  private final ConcurrentMap<String, FinishCounter> finishCountMap;
-
-  /**
-   * Constructs the writers to remote queue.
-   */
-  public ContainerCacheFlusher(Configuration config,
-      XceiverClientManager xceiverClientManager,
-      CBlockTargetMetrics metrics) {
-    int queueSize = config.getInt(DFS_CBLOCK_CACHE_QUEUE_SIZE_KB,
-        DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT) * 1024;
-    int corePoolSize = config.getInt(DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE,
-        DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT);
-    int maxPoolSize = config.getInt(DFS_CBLOCK_CACHE_MAX_POOL_SIZE,
-        DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT);
-    long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE,
-        DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT, TimeUnit.SECONDS);
-    int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
-        DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
-    int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
-        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
-    levelDBCacheSize = 
config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY,
-        DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB;
-
-    LOG.info("Cache: Core Pool Size: {}", corePoolSize);
-    LOG.info("Cache: Keep Alive: {}", keepAlive);
-    LOG.info("Cache: Max Pool Size: {}", maxPoolSize);
-    LOG.info("Cache: Thread Pri: {}", threadPri);
-    LOG.info("Cache: BlockBuffer Size: {}", blockBufferSize);
-
-    shutdown = new AtomicBoolean(false);
-    messageQueue = new LinkedBlockingQueue<>();
-    workQueue = new ArrayBlockingQueue<>(queueSize, true);
-
-    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
-        .setNameFormat("Cache Block Writer Thread #%d")
-        .setDaemon(true)
-        .setPriority(threadPri)
-        .build();
-    threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
-        keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
-        new ThreadPoolExecutor.AbortPolicy());
-    threadPoolExecutor.prestartAllCoreThreads();
-
-    dbMap = new ConcurrentHashMap<>();
-    pipelineMap = new ConcurrentHashMap<>();
-    blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
-    this.xceiverClientManager = xceiverClientManager;
-    this.metrics = metrics;
-    this.remoteIO = new AtomicLong();
-
-    this.finishCountMap = new ConcurrentHashMap<>();
-    this.maxRetryCount =
-        config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY,
-            CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT);
-    this.tracePrefix = getTracePrefix();
-  }
-
-  private void checkExistingLog(String prefixFileName, File dbPath) {
-    if (!dbPath.exists()) {
-      LOG.debug("No existing dirty log found at {}", dbPath);
-      return;
-    }
-    LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
-    HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
-    traverse(prefixFileName, dbPath, allFiles);
-    for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
-      String parentPath = entry.getKey();
-      for (String fileName : entry.getValue()) {
-        LOG.info("found {} {} with prefix {}",
-            parentPath, fileName, prefixFileName);
-        processDirtyBlocks(parentPath, fileName);
-      }
-    }
-  }
-
-  private void traverse(String prefixFileName, File path,
-                        HashMap<String, ArrayList<String>> files) {
-    if (path.isFile()) {
-      if (path.getName().startsWith(prefixFileName)) {
-        LOG.debug("found this {} with {}", path.getParent(), path.getName());
-        if (!files.containsKey(path.getParent())) {
-          files.put(path.getParent(), new ArrayList<>());
-        }
-        files.get(path.getParent()).add(path.getName());
-      }
-    } else {
-      File[] listFiles = path.listFiles();
-      if (listFiles != null) {
-        for (File subPath : listFiles) {
-          traverse(prefixFileName, subPath, files);
-        }
-      }
-    }
-  }
-
-  /**
-   * Gets the CBlockTargetMetrics.
-   *
-   * @return CBlockTargetMetrics
-   */
-  public CBlockTargetMetrics getTargetMetrics() {
-    return metrics;
-  }
-
-  /**
-   * Gets the  getXceiverClientManager.
-   *
-   * @return XceiverClientManager
-   */
-  public XceiverClientManager getXceiverClientManager() {
-    return xceiverClientManager;
-  }
-
-  /**
-   * Shutdown this instance.
-   */
-  public void shutdown() {
-    this.shutdown.set(true);
-    threadPoolExecutor.shutdown();
-  }
-
-  public long incrementRemoteIO() {
-    return remoteIO.incrementAndGet();
-  }
-
-  /**
-   * Processes a block cache file and queues those blocks for the remote I/O.
-   *
-   * @param dbPath - Location where the DB can be found.
-   * @param fileName - Block Cache File Name
-   */
-  public void processDirtyBlocks(String dbPath, String fileName) {
-    LOG.info("Adding {}/{} to queue. Queue Length: {}", dbPath, fileName,
-        messageQueue.size());
-    this.messageQueue.add(new Message(dbPath, fileName));
-  }
-
-  public Logger getLOG() {
-    return LOG;
-  }
-
-  /**
-   * Opens a DB if needed or returns a handle to an already open DB.
-   *
-   * @param dbPath -- dbPath
-   * @return the levelDB on the given path.
-   * @throws IOException
-   */
-  public synchronized LevelDBStore openDB(String dbPath)
-      throws IOException {
-    if (dbMap.containsKey(dbPath)) {
-      RefCountedDB refDB = dbMap.get(dbPath);
-      refDB.open();
-      return refDB.db;
-    } else {
-      Options options = new Options();
-      options.cacheSize(levelDBCacheSize);
-      options.createIfMissing(true);
-      LevelDBStore cacheDB = new LevelDBStore(
-          new File(getDBFileName(dbPath)), options);
-      RefCountedDB refDB = new RefCountedDB(dbPath, cacheDB);
-      dbMap.put(dbPath, refDB);
-      return cacheDB;
-    }
-  }
-
-  /**
-   * Updates the container map. This data never changes so we will update this
-   * during restarts and it should not hurt us.
-   *
-   * Once a CBlockLocalCache cache is registered, requeue dirty/retry log files
-   * for the volume
-   *
-   * @param dbPath - DbPath
-   * @param containerList - Container List.
-   */
-  public void register(String dbPath, Pipeline[] containerList) {
-    File dbFile = Paths.get(dbPath).toFile();
-    pipelineMap.put(dbPath, containerList);
-    checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile);
-    checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile);
-  }
-
-  private String getDBFileName(String dbPath) {
-    return dbPath + ".db";
-  }
-
-  public LevelDBStore getCacheDB(String dbPath) throws IOException {
-    return openDB(dbPath);
-  }
-
-  public void releaseCacheDB(String dbPath) {
-    try {
-      closeDB(dbPath);
-    } catch (Exception e) {
-      metrics.incNumFailedReleaseLevelDB();
-      LOG.error("LevelDB close failed, dbPath:" + dbPath, e);
-    }
-  }
-  /**
-   * Close the DB if we don't have any outstanding references.
-   *
-   * @param dbPath - dbPath
-   * @throws IOException
-   */
-  public synchronized void closeDB(String dbPath) throws IOException {
-    if (dbMap.containsKey(dbPath)) {
-      RefCountedDB refDB = dbMap.get(dbPath);
-      int count = refDB.close();
-      if (count == 0) {
-        dbMap.remove(dbPath);
-      }
-    }
-  }
-
-  Pipeline getPipeline(String dbPath, long blockId) {
-    Pipeline[] containerList = pipelineMap.get(dbPath);
-    Preconditions.checkNotNull(containerList);
-    int containerIdx = (int) blockId % containerList.length;
-    long cBlockIndex =
-        Longs.fromByteArray(containerList[containerIdx].getData());
-    if (cBlockIndex > 0) {
-      // This catches the case when we get a wrong container in the ordering
-      // of the containers.
-      Preconditions.checkState(containerIdx % cBlockIndex == 0,
-          "The container ID computed should match with the container index " +
-              "returned from cBlock Server.");
-    }
-    return containerList[containerIdx];
-  }
-
-  public void incFinishCount(String fileName) {
-    if (!finishCountMap.containsKey(fileName)) {
-      LOG.error("No record for such file:" + fileName);
-      return;
-    }
-    finishCountMap.get(fileName).incCount();
-    if (finishCountMap.get(fileName).isFileDeleted()) {
-      finishCountMap.remove(fileName);
-    }
-  }
-
-  /**
-   * When an object implementing interface <code>Runnable</code> is used
-   * to create a thread, starting the thread causes the object's
-   * <code>run</code> method to be called in that separately executing
-   * thread.
-   * <p>
-   * The general contract of the method <code>run</code> is that it may
-   * take any action whatsoever.
-   *
-   * @see Thread#run()
-   */
-  @Override
-  public void run() {
-    while (!this.shutdown.get()) {
-      try {
-        Message message = messageQueue.take();
-        LOG.debug("Got message to process -- DB Path : {} , FileName; {}",
-            message.getDbPath(), message.getFileName());
-        String fullPath = Paths.get(message.getDbPath(),
-            message.getFileName()).toString();
-        String[] fileNameParts = message.getFileName().split("\\.");
-        Preconditions.checkState(fileNameParts.length > 1);
-        String fileType = fileNameParts[0];
-        boolean isDirtyLogFile =
-            fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX);
-        ReadableByteChannel fileChannel = new FileInputStream(fullPath)
-            .getChannel();
-        // TODO: We can batch and unique the IOs here. First getting the code
-        // to work, we will add those later.
-        int bytesRead = fileChannel.read(blockIDBuffer);
-        fileChannel.close();
-        LOG.debug("Read blockID log of size: {} position {} remaining {}",
-            bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining());
-        // current position of in the buffer in bytes, divided by number of
-        // bytes per long (which is calculated by number of bits per long
-        // divided by number of bits per byte) gives the number of blocks
-        int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
-        if (isDirtyLogFile) {
-          getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
-        } else {
-          getTargetMetrics().incNumBytesRetryLogRead(bytesRead);
-        }
-        if (finishCountMap.containsKey(message.getFileName())) {
-          // In theory this should never happen. But if it happened,
-          // we need to know it...
-          getTargetMetrics().incNumIllegalDirtyLogFiles();
-          LOG.error("Adding DirtyLog file again {} current count {} new {}",
-              message.getFileName(),
-              finishCountMap.get(message.getFileName()).expectedCount,
-              blockCount);
-        }
-        finishCountMap.put(message.getFileName(),
-            new FinishCounter(blockCount, message.getDbPath(),
-                message.getFileName(), this));
-        // should be flip instead of rewind, because we also need to make sure
-        // the end position is correct.
-        blockIDBuffer.flip();
-        LOG.debug("Remaining blocks count {} and {}", 
blockIDBuffer.remaining(),
-            blockCount);
-        while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
-          long blockID = blockIDBuffer.getLong();
-          int retryCount = 0;
-          if (isDirtyLogFile) {
-            getTargetMetrics().incNumDirtyLogBlockRead();
-          } else {
-            getTargetMetrics().incNumRetryLogBlockRead();
-            Preconditions.checkState(fileNameParts.length == 4);
-            retryCount = Integer.parseInt(fileNameParts[3]);
-          }
-          LogicalBlock block = new DiskBlock(blockID, null, false);
-          BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
-              message.getDbPath(), retryCount, message.getFileName(),
-              maxRetryCount);
-          threadPoolExecutor.submit(blockWriterTask);
-        }
-        blockIDBuffer.clear();
-      } catch (InterruptedException e) {
-        LOG.info("ContainerCacheFlusher is interrupted.", e);
-      } catch (FileNotFoundException e) {
-        LOG.error("Unable to find the dirty blocks file. This will cause " +
-            "data errors. Please stop using this volume.", e);
-      } catch (IOException e) {
-        LOG.error("Unable to read the dirty blocks file. This will cause " +
-            "data errors. Please stop using this volume.", e);
-      } catch (Exception e) {
-        LOG.error("Generic exception.", e);
-      }
-    }
-    LOG.info("Exiting flusher");
-  }
-
-  /**
-   * Tries to get the local host IP Address as trace prefix
-   * for creating trace IDs, otherwise uses a random UUID for it.
-   */
-  private static String getTracePrefix() {
-    String tmp;
-    try {
-      tmp = InetAddress.getLocalHost().getHostAddress();
-    } catch (UnknownHostException ex) {
-      tmp = UUID.randomUUID().toString();
-      LOG.error("Unable to read the host address. Using a GUID for " +
-          "hostname:{} ", tmp, ex);
-    }
-    return tmp;
-  }
-
-  /**
-   * We create a trace ID to make it easy to debug issues.
-   * A trace ID is in IPAddress:UserName:VolumeName:blockID:second format.
-   *
-   * This will get written down on the data node if we get any failures, so
-   * with this trace ID we can correlate cBlock failures across machines.
-   *
-   * @param blockID - Block ID
-   * @return trace ID
-   */
-  public String getTraceID(File dbPath, long blockID) {
-    String volumeName = dbPath.getName();
-    String userName = dbPath.getParentFile().getName();
-    // mapping to seconds to make the string smaller.
-    return tracePrefix + ":" + userName + ":" + volumeName
-        + ":" + blockID + ":" + Time.monotonicNow() / 1000;
-  }
-
-  /**
-   * Keeps a Reference counted DB that we close only when the total Reference
-   * has gone to zero.
-   */
-  private static class RefCountedDB {
-    private LevelDBStore db;
-    private AtomicInteger refcount;
-    private String dbPath;
-
-    /**
-     * RefCountedDB DB ctor.
-     *
-     * @param dbPath - DB path.
-     * @param db - LevelDBStore db
-     */
-    RefCountedDB(String dbPath, LevelDBStore db) {
-      this.db = db;
-      this.refcount = new AtomicInteger(1);
-      this.dbPath = dbPath;
-    }
-
-    /**
-     * close the DB if possible.
-     */
-    public int close() throws IOException {
-      int count = this.refcount.decrementAndGet();
-      if (count == 0) {
-        LOG.info("Closing the LevelDB. {} ", this.dbPath);
-        db.close();
-      }
-      return count;
-    }
-
-    public void open() {
-      this.refcount.incrementAndGet();
-    }
-  }
-
-  /**
-   * The message held in processing queue.
-   */
-  private static class Message {
-    private String dbPath;
-    private String fileName;
-
-    /**
-     * A message that holds the info about which path dirty blocks log and
-     * which path contains db.
-     *
-     * @param dbPath
-     * @param fileName
-     */
-    Message(String dbPath, String fileName) {
-      this.dbPath = dbPath;
-      this.fileName = fileName;
-    }
-
-    public String getDbPath() {
-      return dbPath;
-    }
-
-    public void setDbPath(String dbPath) {
-      this.dbPath = dbPath;
-    }
-
-    public String getFileName() {
-      return fileName;
-    }
-
-    public void setFileName(String fileName) {
-      this.fileName = fileName;
-    }
-  }
-
-  private static class FinishCounter {
-    private final long expectedCount;
-    private final String dbPath;
-    private final String dirtyLogPath;
-    private final AtomicLong currentCount;
-    private AtomicBoolean fileDeleted;
-    private final ContainerCacheFlusher flusher;
-
-    FinishCounter(long expectedCount, String dbPath,
-        String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException 
{
-      this.expectedCount = expectedCount;
-      this.dbPath = dbPath;
-      this.dirtyLogPath = dirtyLogPath;
-      this.currentCount = new AtomicLong(0);
-      this.fileDeleted = new AtomicBoolean(false);
-      this.flusher = flusher;
-    }
-
-    public boolean isFileDeleted() {
-      return fileDeleted.get();
-    }
-
-    public void incCount() {
-      long count = this.currentCount.incrementAndGet();
-      if (count >= expectedCount) {
-        String filePath = String.format("%s/%s", dbPath, dirtyLogPath);
-        LOG.debug(
-            "Deleting {} with count {} {}", filePath, count, expectedCount);
-        try {
-          Path path = Paths.get(filePath);
-          Files.delete(path);
-          // the following part tries to remove the directory if it is empty
-          // but not sufficient, because the .db directory still exists....
-          // TODO how to handle the .db directory?
-          /*Path parent = path.getParent();
-          if (parent.toFile().listFiles().length == 0) {
-            Files.delete(parent);
-          }*/
-          fileDeleted.set(true);
-        } catch (Exception e) {
-          flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes();
-          LOG.error("Error deleting dirty log file:" + filePath, e);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
deleted file mode 100644
index 3806d8b..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_ISCSI_ADVERTISED_IP;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_ISCSI_ADVERTISED_PORT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys
-    .DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT;
-
-import org.apache.hadoop.cblock.CblockUtils;
-import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
-import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.jscsi.target.Configuration;
-
-import java.net.InetSocketAddress;
-
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_KEY;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
-
-/**
- * This class runs the target server process.
- */
-public final class SCSITargetDaemon {
-  public static void main(String[] args) throws Exception {
-    CblockUtils.activateConfigs();
-    OzoneConfiguration ozoneConf = new OzoneConfiguration();
-
-    RPC.setProtocolEngine(ozoneConf, CBlockClientServerProtocolPB.class,
-        ProtobufRpcEngine.class);
-    long containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY,
-        DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT);
-    ContainerOperationClient.setContainerSizeB(
-        containerSizeGB * OzoneConsts.GB);
-    String jscsiServerAddress = ozoneConf.get(
-        DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY,
-        DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT);
-    String cbmIPAddress = ozoneConf.get(
-        DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY,
-        DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT
-    );
-    int cbmPort = ozoneConf.getInt(
-        DFS_CBLOCK_JSCSI_PORT_KEY,
-        DFS_CBLOCK_JSCSI_PORT_DEFAULT
-    );
-
-    String scmAddress = ozoneConf.get(OZONE_SCM_CLIENT_BIND_HOST_KEY,
-        OZONE_SCM_CLIENT_BIND_HOST_DEFAULT);
-    int scmClientPort = ozoneConf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
-        OZONE_SCM_CLIENT_PORT_DEFAULT);
-    int scmDatanodePort = ozoneConf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
-        OZONE_SCM_DATANODE_PORT_DEFAULT);
-
-    String scmClientAddress = scmAddress + ":" + scmClientPort;
-    String scmDataodeAddress = scmAddress + ":" + scmDatanodePort;
-
-    ozoneConf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, scmClientAddress);
-    ozoneConf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, scmDataodeAddress);
-
-    InetSocketAddress cbmAddress = new InetSocketAddress(
-        cbmIPAddress, cbmPort);
-    long version = RPC.getProtocolVersion(
-        CBlockServiceProtocolPB.class);
-    CBlockClientProtocolClientSideTranslatorPB cbmClient =
-        new CBlockClientProtocolClientSideTranslatorPB(
-            RPC.getProxy(CBlockClientServerProtocolPB.class, version,
-                cbmAddress, UserGroupInformation.getCurrentUser(), ozoneConf,
-                NetUtils.getDefaultSocketFactory(ozoneConf), 5000)
-        );
-    CBlockManagerHandler cbmHandler = new CBlockManagerHandler(cbmClient);
-
-    String advertisedAddress = ozoneConf.
-        getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress);
-
-    int advertisedPort = ozoneConf.
-        getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT,
-            DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT);
-
-    Configuration jscsiConfig =
-        new Configuration(jscsiServerAddress,
-            advertisedAddress,
-            advertisedPort);
-    DefaultMetricsSystem.initialize("CBlockMetrics");
-    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
-    CBlockTargetServer targetServer = new CBlockTargetServer(
-        ozoneConf, jscsiConfig, cbmHandler, metrics);
-
-    targetServer.call();
-  }
-
-  private SCSITargetDaemon() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
deleted file mode 100644
index 300b2ae..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper.cache;
-
-import java.io.IOException;
-
-/**
- * Defines the interface for cache implementations. The cache will be called
- * by cblock storage module when it performs IO operations.
- */
-public interface CacheModule {
-  /**
-   * check if the key is cached, if yes, returned the cached object.
-   * otherwise, load from data source. Then put it into cache.
-   *
-   * @param blockID
-   * @return the target block.
-   */
-  LogicalBlock get(long blockID) throws IOException;
-
-  /**
-   * put the value of the key into cache.
-   * @param blockID
-   * @param value
-   */
-  void put(long blockID, byte[] value) throws IOException;
-
-  void flush() throws IOException;
-
-  void start() throws IOException;
-
-  void stop() throws IOException;
-
-  void close() throws IOException;
-
-  boolean isDirtyCache();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
deleted file mode 100644
index 470826f..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper.cache;
-
-import java.nio.ByteBuffer;
-
-/**
- * Logical Block is the data structure that we write to the cache,
- * the key and data gets written to remote contianers. Rest is used for
- * book keeping for the cache.
- */
-public interface LogicalBlock {
-  /**
-   * Returns the data stream of this block.
-   * @return - ByteBuffer
-   */
-  ByteBuffer getData();
-
-  /**
-   * Frees the byte buffer since we don't need it any more.
-   */
-  void clearData();
-
-  /**
-   * Returns the Block ID for this Block.
-   * @return long - BlockID
-   */
-  long getBlockID();
-
-  /**
-   * Flag that tells us if this block has been persisted to container.
-   * @return whether this block is now persistent
-   */
-  boolean isPersisted();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
deleted file mode 100644
index 0192c38..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A Queue that is used to write blocks asynchronously to the container.
- */
-public class AsyncBlockWriter {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AsyncBlockWriter.class);
-
-  /**
-   * XceiverClientManager is used to get client connections to a set of
-   * machines.
-   */
-  private final XceiverClientManager xceiverClientManager;
-
-  /**
-   * This lock is used as a signal to re-queuing thread. The requeue thread
-   * wakes up as soon as it is signaled some blocks are in the retry queue.
-   * We try really aggressively since this new block will automatically move
-   * to the end of the queue.
-   * <p>
-   * In the event a container is unavailable for a long time, we can either
-   * fail all writes or remap and let the writes succeed. The easier
-   * semantics is to fail the volume until the container is recovered by SCM.
-   */
-  private final Lock lock;
-  private final Condition notEmpty;
-  /**
-   * The cache this writer is operating against.
-   */
-  private final CBlockLocalCache parentCache;
-  private final BlockBufferManager blockBufferManager;
-  public final static String DIRTY_LOG_PREFIX = "DirtyLog";
-  public static final String RETRY_LOG_PREFIX = "RetryLog";
-  private AtomicLong localIoCount;
-
-  /**
-   * Constructs an Async Block Writer.
-   *
-   * @param config - Config
-   * @param cache - Parent Cache for this writer
-   */
-  public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) {
-
-    Preconditions.checkNotNull(cache, "Cache cannot be null.");
-    Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
-    localIoCount = new AtomicLong();
-    lock = new ReentrantLock();
-    notEmpty = lock.newCondition();
-    parentCache = cache;
-    xceiverClientManager = cache.getClientManager();
-    blockBufferManager = new BlockBufferManager(config, parentCache);
-  }
-
-  public void start() throws IOException {
-    File logDir = new File(parentCache.getDbPath().toString());
-    if (!logDir.exists() && !logDir.mkdirs()) {
-      LOG.error("Unable to create the log directory, Critical error cannot " +
-          "continue. Log Dir : {}", logDir);
-      throw new IllegalStateException("Cache Directory create failed, Cannot " 
+
-          "continue. Log Dir: {}" + logDir);
-    }
-    blockBufferManager.start();
-  }
-
-  /**
-   * Return the log to write to.
-   *
-   * @return Logger.
-   */
-  public static Logger getLOG() {
-    return LOG;
-  }
-
-  /**
-   * Get the CacheDB.
-   *
-   * @return LevelDB Handle
-   */
-  LevelDBStore getCacheDB() {
-    return parentCache.getCacheDB();
-  }
-
-  /**
-   * Returns the client manager.
-   *
-   * @return XceiverClientManager
-   */
-  XceiverClientManager getXceiverClientManager() {
-    return xceiverClientManager;
-  }
-
-  /**
-   * Incs the localIoPacket Count that has gone into this device.
-   */
-  public long incrementLocalIO() {
-    return localIoCount.incrementAndGet();
-  }
-
-  /**
-   * Return the local io counts to this device.
-   * @return the count of io
-   */
-  public long getLocalIOCount() {
-    return localIoCount.get();
-  }
-
-  /**
-   * Writes a block to LevelDB store and queues a work item for the system to
-   * sync the block to containers.
-   *
-   * @param block - Logical Block
-   */
-  public void writeBlock(LogicalBlock block) throws IOException {
-    byte[] keybuf = Longs.toByteArray(block.getBlockID());
-    String traceID = parentCache.getTraceID(block.getBlockID());
-    if (parentCache.isShortCircuitIOEnabled()) {
-      long startTime = Time.monotonicNow();
-      getCacheDB().put(keybuf, block.getData().array());
-      incrementLocalIO();
-      long endTime = Time.monotonicNow();
-      parentCache.getTargetMetrics().updateDBWriteLatency(
-                                                    endTime - startTime);
-      if (parentCache.isTraceEnabled()) {
-        String datahash = DigestUtils.sha256Hex(block.getData().array());
-        parentCache.getTracer().info(
-            "Task=WriterTaskDBPut,BlockID={},Time={},SHA={}",
-            block.getBlockID(), endTime - startTime, datahash);
-      }
-      block.clearData();
-      blockBufferManager.addToBlockBuffer(block.getBlockID());
-    } else {
-      Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
-      String containerName = pipeline.getContainerName();
-      XceiverClientSpi client = null;
-      try {
-        long startTime = Time.monotonicNow();
-        client = parentCache.getClientManager()
-            .acquireClient(parentCache.getPipeline(block.getBlockID()));
-        ContainerProtocolCalls.writeSmallFile(client, containerName,
-            Long.toString(block.getBlockID()), block.getData().array(),
-            traceID);
-        long endTime = Time.monotonicNow();
-        if (parentCache.isTraceEnabled()) {
-          String datahash = DigestUtils.sha256Hex(block.getData().array());
-          parentCache.getTracer().info(
-              "Task=DirectWriterPut,BlockID={},Time={},SHA={}",
-              block.getBlockID(), endTime - startTime, datahash);
-        }
-        parentCache.getTargetMetrics().
-            updateDirectBlockWriteLatency(endTime - startTime);
-        parentCache.getTargetMetrics().incNumDirectBlockWrites();
-      } catch (Exception ex) {
-        parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
-        LOG.error("Direct I/O writing of block:{} traceID:{} to "
-            + "container {} failed", block.getBlockID(), traceID,
-            containerName, ex);
-        throw ex;
-      } finally {
-        if (client != null) {
-          parentCache.getClientManager().releaseClient(client);
-        }
-        block.clearData();
-      }
-    }
-  }
-
-  /**
-   * Shutdown by writing any pending I/O to dirtylog buffer.
-   */
-  public void shutdown() {
-    blockBufferManager.shutdown();
-  }
-  /**
-   * Returns tracer.
-   *
-   * @return Tracer
-   */
-  Logger getTracer() {
-    return parentCache.getTracer();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
deleted file mode 100644
index c61a7a4..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
-
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Paths;
-
-/**
- * This task is responsible for flushing the BlockIDBuffer
- * to Dirty Log File. This Dirty Log file is used later by
- * ContainerCacheFlusher when the data is written to container
- */
-public class BlockBufferFlushTask implements Runnable {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockBufferFlushTask.class);
-  private final CBlockLocalCache parentCache;
-  private final BlockBufferManager bufferManager;
-  private final ByteBuffer blockIDBuffer;
-
-  BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache,
-                       BlockBufferManager manager) {
-    this.parentCache = parentCache;
-    this.bufferManager = manager;
-    this.blockIDBuffer = blockIDBuffer;
-  }
-
-  /**
-   * When an object implementing interface <code>Runnable</code> is used
-   * to create a thread, starting the thread causes the object's
-   * <code>run</code> method to be called in that separately executing
-   * thread.
-   * <p>
-   * The general contract of the method <code>run</code> is that it may
-   * take any action whatsoever.
-   *
-   * @see Thread#run()
-   */
-  @Override
-  public void run() {
-    try {
-      writeBlockBufferToFile(blockIDBuffer);
-    } catch (Exception e) {
-      parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes();
-      LOG.error("Unable to sync the Block map to disk with "
-          + (blockIDBuffer.position() / Long.SIZE) + "entries "
-          + "-- NOTE: This might cause a data loss or corruption", e);
-    } finally {
-      bufferManager.releaseBuffer(blockIDBuffer);
-    }
-  }
-
-  /**
-   * Write Block Buffer to file.
-   *
-   * @param buffer - ByteBuffer
-   * @throws IOException
-   */
-  private void writeBlockBufferToFile(ByteBuffer buffer)
-      throws IOException {
-    long startTime = Time.monotonicNow();
-    boolean append = false;
-
-    // If there is nothing written to blockId buffer,
-    // then skip flushing of blockId buffer
-    if (buffer.position() == 0) {
-      return;
-    }
-
-    buffer.flip();
-    String fileName =
-        String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX,
-                      Time.monotonicNow());
-    String log = Paths.get(parentCache.getDbPath().toString(), fileName)
-        .toString();
-
-    FileChannel channel = new FileOutputStream(log, append).getChannel();
-    int bytesWritten = channel.write(buffer);
-    channel.close();
-    buffer.clear();
-    parentCache.processDirtyMessage(fileName);
-    long endTime = Time.monotonicNow();
-    if (parentCache.isTraceEnabled()) {
-      parentCache.getTracer().info(
-          "Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
-          endTime - startTime, bytesWritten);
-    }
-
-    parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted();
-    parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
-    parentCache.getTargetMetrics().
-        updateBlockBufferFlushLatency(endTime - startTime);
-    LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
-        bytesWritten, endTime - startTime);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
 
b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
deleted file mode 100644
index 5d3209c..0000000
--- 
a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_KEEP_ALIVE;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_THREAD_PRIORITY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.
-    DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
-
-/**
- * This class manages the block ID buffer.
- * Block ID Buffer keeps a list of blocks which are in leveldb cache
- * This buffer is used later when the blocks are flushed to container
- *
- * Two blockIDBuffers are maintained so that write are not blocked when
- * DirtyLog is being written. Once a blockIDBuffer is full, it will be
- * enqueued for DirtyLog write while the other buffer accepts new write.
- * Once the DirtyLog write is done, the buffer is returned back to the pool.
- *
- * There are three triggers for blockIDBuffer flush
- * 1) BlockIDBuffer is full,
- * 2) Time period defined for blockIDBuffer flush has elapsed.
- * 3) Shutdown
- */
-public class BlockBufferManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockBufferManager.class);
-
-  private enum FlushReason {
-    BUFFER_FULL,
-    SHUTDOWN,
-    TIMER
-  };
-
-  private final int blockBufferSize;
-  private final CBlockLocalCache parentCache;
-  private final ScheduledThreadPoolExecutor scheduledExecutor;
-  private final ThreadPoolExecutor threadPoolExecutor;
-  private final long intervalSeconds;
-  private final ArrayBlockingQueue<ByteBuffer> acquireQueue;
-  private final ArrayBlockingQueue<Runnable> workQueue;
-  private ByteBuffer currentBuffer;
-
-  BlockBufferManager(Configuration config, CBlockLocalCache parentCache) {
-    this.parentCache = parentCache;
-    this.scheduledExecutor = new ScheduledThreadPoolExecutor(1);
-
-    this.intervalSeconds =
-        config.getTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL,
-            DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT,
-            TimeUnit.SECONDS);
-
-    long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE,
-        DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT,
-        TimeUnit.SECONDS);
-    this.workQueue = new ArrayBlockingQueue<>(2, true);
-    int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
-        DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
-    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
-        .setNameFormat("Cache Block Buffer Manager Thread #%d")
-        .setDaemon(true)
-        .setPriority(threadPri)
-        .build();
-    /*
-     * starting a thread pool with core pool size of 1 and maximum of 2 threads
-     * as there are maximum of 2 buffers which can be flushed at the same time.
-     */
-    this.threadPoolExecutor = new ThreadPoolExecutor(1, 2,
-        keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
-        new ThreadPoolExecutor.AbortPolicy());
-
-    this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
-        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
-    this.acquireQueue = new ArrayBlockingQueue<>(2, true);
-
-    for (int i = 0; i < 2; i++) {
-      acquireQueue.add(ByteBuffer.allocate(blockBufferSize));
-    }
-    // get the first buffer to be used
-    this.currentBuffer = acquireQueue.remove();
-
-    LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}",
-        blockBufferSize, intervalSeconds);
-  }
-
-  // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns.
-  // This enqueue is asynchronous and hence triggerBlockBufferFlush will
-  // only block when there are no available buffers in acquireQueue
-  // Once the DirtyLog write is done, buffer is returned back to
-  // BlockBufferManager using releaseBuffer
-  private synchronized void triggerBlockBufferFlush(FlushReason reason) {
-    LOG.debug("Flush triggered because: " + reason.toString() +
-        " Num entries in buffer: " +
-        currentBuffer.position() / (Long.SIZE / Byte.SIZE) +
-        " Acquire Queue Size: " + acquireQueue.size());
-
-    parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered();
-    BlockBufferFlushTask flushTask =
-        new BlockBufferFlushTask(currentBuffer, parentCache, this);
-    threadPoolExecutor.submit(flushTask);
-    try {
-      currentBuffer = acquireQueue.take();
-    } catch (InterruptedException ex) {
-      currentBuffer = null;
-      parentCache.getTargetMetrics().incNumInterruptedBufferWaits();
-      LOG.error("wait on take operation on acquire queue interrupted", ex);
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  public synchronized void addToBlockBuffer(long blockId)  {
-    parentCache.getTargetMetrics().incNumBlockBufferUpdates();
-    currentBuffer.putLong(blockId);
-    // if no space left, flush this buffer
-    if (currentBuffer.remaining() == 0) {
-      triggerBlockBufferFlush(FlushReason.BUFFER_FULL);
-    }
-  }
-
-  public void releaseBuffer(ByteBuffer buffer) {
-    if (buffer.position() != 0) {
-      LOG.error("requeuing a non empty buffer with:{}",
-          "elements enqueued in the acquire queue",
-          buffer.position() / (Long.SIZE / Byte.SIZE));
-      buffer.reset();
-    }
-    // There should always be space in the queue to add an element
-    acquireQueue.add(buffer);
-  }
-
-  // Start a scheduled task to flush blockIDBuffer
-  public void start() {
-    Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER);
-    scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds,
-                                        intervalSeconds, TimeUnit.SECONDS);
-    threadPoolExecutor.prestartAllCoreThreads();
-  }
-
-  public void shutdown() {
-    triggerBlockBufferFlush(FlushReason.SHUTDOWN);
-    scheduledExecutor.shutdown();
-    threadPoolExecutor.shutdown();
-  }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to