Repository: incubator-crail
Updated Branches:
  refs/heads/master c053477a7 -> dc50fc193


New NVMf storage tier: use jNVMf library

New NVMf storage tier implementation which uses jNVMf library instead
of SPDK. We do not implement unaligned reads anymore since we believe
the semantics of the underlying storage system should not be hidden like
this. We guarantee good performance when using buffered streams since
they now try to align accesses whenever possible.

https://issues.apache.org/jira/browse/CRAIL-22

Close #16

Signed-off-by: Jonas Pfefferle <peppe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/29be91d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/29be91d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/29be91d2

Branch: refs/heads/master
Commit: 29be91d2f00e779fb5dc37575db9e5e763ea6db0
Parents: c053477
Author: Jonas Pfefferle <peppe...@apache.org>
Authored: Thu Mar 1 17:29:34 2018 +0100
Committer: Jonas Pfefferle <peppe...@apache.org>
Committed: Tue Apr 10 13:18:19 2018 +0200

----------------------------------------------------------------------
 storage-nvmf/pom.xml                            |   5 +-
 .../crail/storage/nvmf/NvmfBufferCache.java     |  69 ----
 .../crail/storage/nvmf/NvmfStorageClient.java   |  43 ++-
 .../storage/nvmf/NvmfStorageConstants.java      |  73 ++---
 .../crail/storage/nvmf/NvmfStorageServer.java   |  66 ++--
 .../crail/storage/nvmf/NvmfStorageTier.java     |   6 +-
 .../crail/storage/nvmf/client/NvmfFuture.java   | 149 +++++++++
 .../nvmf/client/NvmfRegisteredBufferCache.java  | 105 +++++++
 .../nvmf/client/NvmfStagingBufferCache.java     | 159 ++++++++++
 .../nvmf/client/NvmfStorageEndpoint.java        | 315 ++++++++++---------
 .../storage/nvmf/client/NvmfStorageFuture.java  | 116 -------
 .../nvmf/client/NvmfStorageUnalignedFuture.java | 110 -------
 .../client/NvmfStorageUnalignedRMWFuture.java   |  76 -----
 .../client/NvmfStorageUnalignedReadFuture.java  |  60 ----
 .../client/NvmfStorageUnalignedWriteFuture.java |  50 ---
 .../storage/nvmf/client/NvmfStorageUtils.java   |  48 ---
 .../nvmf/client/NvmfUnalignedWriteFuture.java   | 183 +++++++++++
 .../nvmf/client/NvmfStagingBufferCacheTest.java |  73 +++++
 18 files changed, 941 insertions(+), 765 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/pom.xml
----------------------------------------------------------------------
diff --git a/storage-nvmf/pom.xml b/storage-nvmf/pom.xml
index 6500632..20ad294 100644
--- a/storage-nvmf/pom.xml
+++ b/storage-nvmf/pom.xml
@@ -22,8 +22,9 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.ibm.disni</groupId>
-      <artifactId>disni</artifactId>
+      <groupId>com.ibm.jnvmf</groupId>
+      <artifactId>jnvmf</artifactId>
+      <version>1.0</version>
     </dependency>
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java
deleted file mode 100644
index 19142ec..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf;
-
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.memory.BufferCache;
-import org.apache.crail.memory.OffHeapBuffer;
-
-
-public class NvmfBufferCache extends BufferCache {
-       private static final int ALIGNMENT = 4096;
-       private NvmeEndpointGroup endpointGroup;
-
-       private List<ByteBuffer> bufferPool = new ArrayList<>();
-
-       public NvmfBufferCache() throws IOException {
-               super();
-               endpointGroup = NvmfStorageTier.getEndpointGroup();
-               if (endpointGroup == null) {
-                       throw new IOException("NvmfStorageTier not 
initialized");
-               }
-       }
-
-       @Override
-       public String providerName() {
-               return "NvmfBufferCache";
-       }
-
-       @Override
-       public CrailBuffer allocateRegion() throws IOException {
-               ByteBuffer buffer = 
endpointGroup.allocateBuffer(CrailConstants.BUFFER_SIZE, ALIGNMENT);
-               bufferPool.add(buffer);
-               return OffHeapBuffer.wrap(buffer);
-       }
-
-       @Override
-       public void close() {
-               super.close();
-               for (ByteBuffer buffer : bufferPool) {
-                       endpointGroup.freeBuffer(buffer);
-               }
-               bufferPool.clear();
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index d270f5d..d9dd976 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,8 +19,7 @@
 
 package org.apache.crail.storage.nvmf;
 
-import java.io.IOException;
-
+import com.ibm.jnvmf.Nvme;
 import org.apache.crail.CrailBufferCache;
 import org.apache.crail.CrailStatistics;
 import org.apache.crail.conf.CrailConfiguration;
@@ -31,13 +30,23 @@ import 
org.apache.crail.storage.nvmf.client.NvmfStorageEndpoint;
 import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.NvmeTransportType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class NvmfStorageClient implements StorageClient {
        private static final Logger LOG = CrailUtils.getLogger();
-       private static NvmeEndpointGroup clientGroup;
-       private boolean initialized = false;
+       private static Nvme nvme;
+       private boolean initialized;
+       private List<StorageEndpoint> endpoints;
+       private CrailStatistics statistics;
+       private CrailBufferCache bufferCache;
+
+       public NvmfStorageClient() {
+               this.initialized = false;
+               this.endpoints = new CopyOnWriteArrayList<>();
+       }
 
        public void init(CrailStatistics statistics, CrailBufferCache 
bufferCache, CrailConfiguration crailConfiguration,
                                         String[] args) throws IOException {
@@ -45,7 +54,9 @@ public class NvmfStorageClient implements StorageClient {
                        throw new IOException("NvmfStorageTier already 
initialized");
                }
                initialized = true;
-
+               this.statistics = statistics;
+               this.bufferCache = bufferCache;
+               LOG.info("Initialize Nvmf storage client");
                NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
        }
 
@@ -53,19 +64,23 @@ public class NvmfStorageClient implements StorageClient {
                NvmfStorageConstants.printConf(logger);
        }
 
-       public static NvmeEndpointGroup getEndpointGroup() {
-               if (clientGroup == null) {
-                       clientGroup = new NvmeEndpointGroup(new 
NvmeTransportType[]{NvmeTransportType.RDMA},
-                                       NvmfStorageConstants.CLIENT_MEMPOOL);
+       public static Nvme getEndpointGroup() throws UnknownHostException {
+               if (nvme == null) {
+                       nvme = new Nvme();
                }
-               return clientGroup;
+               return nvme;
        }
 
        public synchronized StorageEndpoint createEndpoint(DataNodeInfo info) 
throws IOException {
-               return new NvmfStorageEndpoint(getEndpointGroup(), 
CrailUtils.datanodeInfo2SocketAddr(info));
+               StorageEndpoint endpoint = new 
NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
+               endpoints.add(endpoint);
+               return endpoint;
        }
 
        public void close() throws Exception {
+               for (StorageEndpoint endpoint : endpoints) {
+                       endpoint.close();
+               }
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
index d65e398..8ce132b 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,13 +19,9 @@
 
 package org.apache.crail.storage.nvmf;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import com.ibm.jnvmf.NamespaceIdentifier;
+import com.ibm.jnvmf.NvmeQualifiedName;
+import org.apache.commons.cli.*;
 import org.apache.crail.conf.CrailConfiguration;
 import org.apache.crail.conf.CrailConstants;
 import org.slf4j.Logger;
@@ -33,7 +29,6 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
 
 public class NvmfStorageConstants {
 
@@ -46,22 +41,19 @@ public class NvmfStorageConstants {
        public static int PORT = 50025;
 
        public static final String NQN_KEY = "nqn";
-       public static String NQN = "nqn.2016-06.io.spdk:cnode1";
+       public static NvmeQualifiedName NQN = new 
NvmeQualifiedName("nqn.2017-06.io.crail:cnode");
 
-       public static final String NAMESPACE_KEY = "namespace";
-       public static int NAMESPACE = 1;
+       /* this is a server property, the client will get the nsid from the 
namenode */
+       public static NamespaceIdentifier NAMESPACE = new 
NamespaceIdentifier(1);
 
        public static final String ALLOCATION_SIZE_KEY = "allocationsize";
        public static long ALLOCATION_SIZE = 1073741824; /* 1GB */
 
-       public static final String SERVER_MEMPOOL_KEY = "servermempool";
-       public static long SERVER_MEMPOOL = 256;
+       public static final String QUEUE_SIZE_KEY = "queueSize";
+       public static int QUEUE_SIZE = 64;
 
-       public static final String CLIENT_MEMPOOL_KEY = "clientmempool";
-       public static long CLIENT_MEMPOOL = 256;
-
-       public static final TimeUnit TIME_UNIT = TimeUnit.MINUTES;
-       public static final long TIME_OUT = 15;
+       public static final String STAGING_CACHE_SIZE_KEY = "stagingcachesize";
+       public static int STAGING_CACHE_SIZE = 262144;
 
        private static String fullKey(String key) {
                return PREFIX + "." + key;
@@ -72,12 +64,7 @@ public class NvmfStorageConstants {
        }
 
        public static void updateConstants(CrailConfiguration conf) throws 
UnknownHostException {
-               String arg = get(conf, NAMESPACE_KEY);
-               if (arg != null) {
-                       NAMESPACE = Integer.parseInt(arg);
-               }
-
-               arg = get(conf, IP_ADDR_KEY);
+               String arg = get(conf, IP_ADDR_KEY);
                if (arg != null) {
                        IP_ADDR = InetAddress.getByName(arg);
                }
@@ -89,7 +76,7 @@ public class NvmfStorageConstants {
 
                arg = get(conf, NQN_KEY);
                if (arg != null) {
-                       NQN = arg;
+                       NQN = new NvmeQualifiedName(arg);
                }
 
                arg = get(conf, ALLOCATION_SIZE_KEY);
@@ -97,23 +84,23 @@ public class NvmfStorageConstants {
                        ALLOCATION_SIZE = Long.parseLong(arg);
                }
 
-               arg = get(conf, SERVER_MEMPOOL_KEY);
+               arg = get(conf, QUEUE_SIZE_KEY);
                if (arg != null) {
-                       SERVER_MEMPOOL = Long.parseLong(arg);
+                       QUEUE_SIZE = Integer.parseInt(arg);
                }
 
-               arg = get(conf, CLIENT_MEMPOOL_KEY);
+               arg = get(conf, STAGING_CACHE_SIZE_KEY);
                if (arg != null) {
-                       CLIENT_MEMPOOL = Long.parseLong(arg);
+                       STAGING_CACHE_SIZE = Integer.parseInt(arg);
                }
        }
 
-       public static void verify() throws IOException {
-               if (NAMESPACE <= 0){
-                       throw new IOException("Namespace must be > 0");
-               }
+       public static void verify() {
                if (ALLOCATION_SIZE % CrailConstants.BLOCK_SIZE != 0){
-                       throw new IOException("allocationsize must be multiple 
of crail.blocksize");
+                       throw new IllegalArgumentException("allocationsize must 
be multiple of crail.blocksize");
+               }
+               if (QUEUE_SIZE < 0) {
+                       throw new IllegalArgumentException("Queue size 
negative");
                }
        }
 
@@ -123,12 +110,10 @@ public class NvmfStorageConstants {
                }
                logger.info(fullKey(PORT_KEY) + " " + PORT);
                logger.info(fullKey(NQN_KEY) + " " + NQN);
-               logger.info(fullKey(NAMESPACE_KEY) + " " + NAMESPACE);
                logger.info(fullKey(ALLOCATION_SIZE_KEY) + " " + 
ALLOCATION_SIZE);
-               logger.info(fullKey(SERVER_MEMPOOL_KEY) + " " + SERVER_MEMPOOL);
-               logger.info(fullKey(CLIENT_MEMPOOL_KEY) + " " + CLIENT_MEMPOOL);
+               logger.info(fullKey(QUEUE_SIZE_KEY) + " " + QUEUE_SIZE);
        }
-       
+
        public static void parseCmdLine(CrailConfiguration crailConfiguration, 
String[] args) throws IOException {
                NvmfStorageConstants.updateConstants(crailConfiguration);
 
@@ -140,10 +125,12 @@ public class NvmfStorageConstants {
                                bindIp.setRequired(true);
                        }
                        Option port = Option.builder("p").desc("target 
port").hasArg().type(Number.class).build();
+                       Option namespace = Option.builder("n").desc("namespace 
id").hasArg().type(Number.class).build();
                        Option nqn = Option.builder("nqn").desc("target 
subsystem NQN").hasArg().build();
                        options.addOption(bindIp);
                        options.addOption(port);
                        options.addOption(nqn);
+                       options.addOption(namespace);
                        CommandLineParser parser = new DefaultParser();
                        HelpFormatter formatter = new HelpFormatter();
                        CommandLine line = null;
@@ -152,6 +139,10 @@ public class NvmfStorageConstants {
                                if (line.hasOption(port.getOpt())) {
                                        NvmfStorageConstants.PORT = ((Number) 
line.getParsedOptionValue(port.getOpt())).intValue();
                                }
+                               if (line.hasOption(namespace.getOpt())) {
+                                       NvmfStorageConstants.NAMESPACE = new
+                                                       
NamespaceIdentifier(((Number) 
line.getParsedOptionValue(namespace.getOpt())).intValue());
+                               }
                        } catch (ParseException e) {
                                System.err.println(e.getMessage());
                                formatter.printHelp("NVMe storage tier", 
options);
@@ -161,10 +152,10 @@ public class NvmfStorageConstants {
                                NvmfStorageConstants.IP_ADDR = 
InetAddress.getByName(line.getOptionValue(bindIp.getOpt()));
                        }
                        if (line.hasOption(nqn.getOpt())) {
-                               NvmfStorageConstants.NQN = 
line.getOptionValue(nqn.getOpt());
+                               NvmfStorageConstants.NQN = new 
NvmeQualifiedName(line.getOptionValue(nqn.getOpt()));
                        }
                }
 
                NvmfStorageConstants.verify();
-       }       
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index 3fd958e..f8b0d8c 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,10 +19,7 @@
 
 package org.apache.crail.storage.nvmf;
 
-import com.ibm.disni.nvmef.NvmeEndpoint;
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.NvmeTransportType;
-
+import com.ibm.jnvmf.*;
 import org.apache.crail.conf.CrailConfiguration;
 import org.apache.crail.storage.StorageResource;
 import org.apache.crail.storage.StorageServer;
@@ -31,19 +28,19 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
+import java.util.List;
 
 public class NvmfStorageServer implements StorageServer {
        private static final Logger LOG = CrailUtils.getLogger();
 
        private boolean isAlive;
        private long alignedSize;
-       private long offset;
+       private long address;
        private boolean initialized = false;
-       private NvmeEndpoint endpoint;
+       private Controller controller;
 
        public NvmfStorageServer() {}
-       
+
        public void init(CrailConfiguration crailConfiguration, String[] args) 
throws Exception {
                if (initialized) {
                        throw new IOException("NvmfStorageTier already 
initialized");
@@ -51,23 +48,40 @@ public class NvmfStorageServer implements StorageServer {
                initialized = true;
                NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
 
-               NvmeEndpointGroup group = new NvmeEndpointGroup(new 
NvmeTransportType[]{NvmeTransportType.RDMA}, 
NvmfStorageConstants.SERVER_MEMPOOL);
-               endpoint = group.createEndpoint();
-
-               URI uri = new URI("nvmef://" + 
NvmfStorageConstants.IP_ADDR.getHostAddress() + ":" + NvmfStorageConstants.PORT 
+
-                                       "/0/" + NvmfStorageConstants.NAMESPACE 
+ "?subsystem=" + NvmfStorageConstants.NQN);
-               endpoint.connect(uri);
-
-               long namespaceSize = endpoint.getNamespaceSize();
+               Nvme nvme = new Nvme();
+               NvmfTransportId transportId = new NvmfTransportId(
+                               new 
InetSocketAddress(NvmfStorageConstants.IP_ADDR, NvmfStorageConstants.PORT),
+                               NvmfStorageConstants.NQN);
+               controller = nvme.connect(transportId);
+               controller.getControllerConfiguration().setEnable(true);
+               controller.syncConfiguration();
+               controller.waitUntilReady();
+
+               List<Namespace> namespaces = controller.getActiveNamespaces();
+               Namespace namespace = null;
+               for (Namespace n : namespaces) {
+                       if 
(n.getIdentifier().equals(NvmfStorageConstants.NAMESPACE)) {
+                               namespace = n;
+                               break;
+                       }
+               }
+               if (namespace == null) {
+                       throw new IllegalArgumentException("No namespace with 
id " + NvmfStorageConstants.NAMESPACE +
+                                       " at controller " + 
transportId.toString());
+               }
+               IdentifyNamespaceData namespaceData = 
namespace.getIdentifyNamespaceData();
+               LBAFormat lbaFormat = namespaceData.getFormattedLBASize();
+               int dataSize = lbaFormat.getLBADataSize().toInt();
+               long namespaceSize = dataSize * 
namespaceData.getNamespaceCapacity();
                alignedSize = namespaceSize - (namespaceSize % 
NvmfStorageConstants.ALLOCATION_SIZE);
-               offset = 0;
+               address = 0;
 
                isAlive = true;
-       }       
+       }
 
        @Override
        public void printConf(Logger log) {
-               NvmfStorageConstants.printConf(log);            
+               NvmfStorageConstants.printConf(log);
        }
 
        public void run() {
@@ -75,7 +89,7 @@ public class NvmfStorageServer implements StorageServer {
                while (isAlive) {
                        try {
                                Thread.sleep(1000 /* ms */);
-                               endpoint.keepAlive();
+                               controller.keepAlive();
                        } catch (Exception e) {
                                e.printStackTrace();
                                isAlive = false;
@@ -86,15 +100,15 @@ public class NvmfStorageServer implements StorageServer {
        @Override
        public StorageResource allocateResource() throws Exception {
                StorageResource resource = null;
-               
+
                if (alignedSize > 0){
                        LOG.info("new block, length " + 
NvmfStorageConstants.ALLOCATION_SIZE);
-                       LOG.debug("block stag 0, offset " + offset + ", length 
" + NvmfStorageConstants.ALLOCATION_SIZE);
+                       LOG.debug("block stag 0, address " + address + ", 
length " + NvmfStorageConstants.ALLOCATION_SIZE);
                        alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE;
-                       resource = StorageResource.createResource(offset, 
(int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
-                       offset += NvmfStorageConstants.ALLOCATION_SIZE;
+                       resource = StorageResource.createResource(address, 
(int)NvmfStorageConstants.ALLOCATION_SIZE, 0);
+                       address += NvmfStorageConstants.ALLOCATION_SIZE;
                }
-               
+
                return resource;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
index 1e37b44..749a331 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -26,9 +26,9 @@ import org.slf4j.Logger;
 
 public class NvmfStorageTier extends NvmfStorageClient implements StorageTier {
        private static final Logger LOG = CrailUtils.getLogger();
-       
+
        public StorageServer launchServer() throws Exception {
-               LOG.info("initalizing NVMf datanode");
+               LOG.info("initalizing NVMf storage tier");
                NvmfStorageServer storageServer = new NvmfStorageServer();
                return storageServer;
        }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
new file mode 100644
index 0000000..0a6c9b4
--- /dev/null
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import com.ibm.jnvmf.*;
+import org.apache.crail.storage.StorageFuture;
+import org.apache.crail.storage.StorageResult;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class NvmfFuture<Command extends NvmIOCommand<? extends 
NvmIOCommandCapsule>> implements StorageFuture, OperationCallback {
+       private final NvmfStorageEndpoint endpoint;
+       private final Command command;
+       private final Queue<Command> operations;
+       private boolean done;
+       private RdmaException exception;
+       private final StorageResult storageResult;
+       private final Response<NvmResponseCapsule> response;
+       private int completed;
+
+       NvmfFuture(NvmfStorageEndpoint endpoint, Command command, 
Response<NvmResponseCapsule> response,
+                          Queue<Command> operations, int length) {
+               this.endpoint = endpoint;
+               this.command = command;
+               this.operations = operations;
+               this.done = false;
+               this.storageResult = () -> length;
+               this.response = response;
+               this.completed = 0;
+       }
+
+       @Override
+       public boolean isSynchronous() {
+               return false;
+       }
+
+       @Override
+       public boolean cancel(boolean b) {
+               return false;
+       }
+
+       @Override
+       public boolean isCancelled() {
+               return false;
+       }
+
+       @Override
+       public boolean isDone() {
+               if (!done) {
+                       try {
+                               endpoint.poll();
+                       } catch (IOException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+               return done;
+       }
+
+       private final void checkStatus() throws ExecutionException {
+               if (exception != null) {
+                       throw new ExecutionException(exception);
+               }
+               NvmCompletionQueueEntry cqe = 
response.getResponseCapsule().getCompletionQueueEntry();
+               StatusCode.Value statusCode = cqe.getStatusCode();
+               if (statusCode != null) {
+                       if 
(!statusCode.equals(GenericStatusCode.getInstance().SUCCESS)) {
+                               throw new ExecutionException(new 
UnsuccessfulComandException(cqe));
+                       }
+               }
+       }
+
+       @Override
+       public StorageResult get() throws InterruptedException, 
ExecutionException {
+               try {
+                       return get(2, TimeUnit.MINUTES);
+               } catch (TimeoutException e) {
+                       throw new ExecutionException(e);
+               }
+       }
+
+       @Override
+       public StorageResult get(long timeout, TimeUnit timeUnit) throws 
InterruptedException, ExecutionException, TimeoutException {
+               if (!done) {
+                       long start = System.nanoTime();
+                       long end = start + 
TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
+                       boolean waitTimeOut;
+                       do {
+                               try {
+                                       endpoint.poll();
+                               } catch (IOException e) {
+                                       throw new ExecutionException(e);
+                               }
+                               waitTimeOut = System.nanoTime() > end;
+                       } while (!done && !waitTimeOut);
+                       if (!done && waitTimeOut) {
+                               throw new TimeoutException("poll wait time 
out!");
+                       }
+               }
+               checkStatus();
+               return storageResult;
+       }
+
+       @Override
+       public void onStart() {
+
+       }
+
+       @Override
+       public void onComplete() {
+               assert !done;
+               assert completed < 2;
+               if (++completed == 2) {
+                       /* we need to complete command and response */
+                       operations.add(command);
+                       this.done = true;
+                       endpoint.putOperation();
+               }
+       }
+
+       @Override
+       public void onFailure(RdmaException e) {
+               assert !done;
+               this.operations.add(command);
+               this.exception = e;
+               this.done = true;
+               endpoint.putOperation();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
new file mode 100644
index 0000000..0a364e8
--- /dev/null
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import com.ibm.jnvmf.Freeable;
+import com.ibm.jnvmf.KeyedNativeBuffer;
+import com.ibm.jnvmf.NativeByteBuffer;
+import com.ibm.jnvmf.QueuePair;
+import org.apache.crail.CrailBuffer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+class NvmfRegisteredBufferCache implements Freeable {
+       private final QueuePair queuePair;
+       private final Map<CrailBuffer, KeyedNativeBuffer> bufferMap;
+       private final Map<Long, KeyedNativeBuffer> regionMap;
+       private boolean valid;
+
+       public NvmfRegisteredBufferCache(QueuePair queuePair) {
+               this.queuePair = queuePair;
+               this.bufferMap = new ConcurrentHashMap<>();
+               this.regionMap = new ConcurrentHashMap<>();
+               this.valid = true;
+       }
+
+       static class Buffer extends NativeByteBuffer implements 
KeyedNativeBuffer {
+               private final KeyedNativeBuffer registeredRegionBuffer;
+
+               Buffer(CrailBuffer buffer, KeyedNativeBuffer 
registeredRegionBuffer) {
+                       super(buffer.getByteBuffer());
+                       this.registeredRegionBuffer = registeredRegionBuffer;
+               }
+
+               @Override
+               public int getRemoteKey() {
+                       return registeredRegionBuffer.getRemoteKey();
+               }
+
+               @Override
+               public int getLocalKey() {
+                       return registeredRegionBuffer.getLocalKey();
+               }
+       }
+
+       KeyedNativeBuffer get(CrailBuffer buffer) throws IOException {
+               KeyedNativeBuffer keyedNativeBuffer = bufferMap.get(buffer);
+               if (keyedNativeBuffer == null) {
+                       CrailBuffer regionBuffer = buffer.getRegion();
+                       keyedNativeBuffer = 
regionMap.get(regionBuffer.address());
+                       if (keyedNativeBuffer == null) {
+                               /* region has not been registered yet */
+                               keyedNativeBuffer = 
queuePair.registerMemory(regionBuffer.getByteBuffer());
+                               KeyedNativeBuffer prevKeyedNativeBuffer =
+                                               
regionMap.putIfAbsent(keyedNativeBuffer.getAddress(), keyedNativeBuffer);
+                               if (prevKeyedNativeBuffer != null) {
+                                       /* someone registered the same region 
in parallel */
+                                       keyedNativeBuffer.free();
+                                       keyedNativeBuffer = 
prevKeyedNativeBuffer;
+                               }
+                       }
+                       keyedNativeBuffer = new Buffer(buffer, 
keyedNativeBuffer);
+                       KeyedNativeBuffer prevKeyedNativeBuffer =
+                                       bufferMap.putIfAbsent(buffer, 
keyedNativeBuffer);
+                       if (prevKeyedNativeBuffer != null) {
+                               /* someone added the same buffer parallel */
+                               keyedNativeBuffer.free();
+                               keyedNativeBuffer = prevKeyedNativeBuffer;
+                       }
+               }
+               return keyedNativeBuffer;
+       }
+
+
+       @Override
+       public void free() throws IOException {
+               for (KeyedNativeBuffer buffer : bufferMap.values()) {
+                       buffer.free();
+               }
+               valid = false;
+       }
+
+       @Override
+       public boolean isValid() {
+               return valid;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
new file mode 100644
index 0000000..b4f4dc3
--- /dev/null
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.storage.StorageFuture;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NvmfStagingBufferCache {
+       private final Map<Long, BufferCacheEntry> remoteAddressMap;
+       private final Queue<CrailBuffer> freeBuffers;
+       private int buffersLeft;
+       private final int lbaDataSize;
+       private final CrailBufferCache bufferCache;
+
+       private final CrailBufferCache getBufferCache() {
+               return bufferCache;
+       }
+
+       NvmfStagingBufferCache(CrailBufferCache bufferCache, int maxEntries, 
int lbaDataSize) {
+               if (maxEntries <= 0) {
+                       throw new IllegalArgumentException("maximum entries <= 
0");
+               }
+               if (lbaDataSize <= 0) {
+                       throw new IllegalArgumentException("LBA data size <= 
0");
+               }
+               this.remoteAddressMap = new ConcurrentHashMap<>(maxEntries);
+               this.freeBuffers = new ArrayBlockingQueue<>(maxEntries);
+               this.buffersLeft = maxEntries;
+               this.lbaDataSize = lbaDataSize;
+               this.bufferCache = bufferCache;
+       }
+
+       synchronized void allocateFreeBuffers() throws Exception {
+               if (!freeBuffers.isEmpty()) {
+                       return;
+               }
+               if (buffersLeft == 0) {
+                       /* TODO: make sure this happens rarely */
+                       Iterator<BufferCacheEntry> iterator = 
remoteAddressMap.values().iterator();
+                       while (iterator.hasNext()) {
+                               BufferCacheEntry currentEntry = iterator.next();
+                               if (currentEntry.tryFree()) {
+                                       iterator.remove();
+                                       
freeBuffers.add(currentEntry.getBuffer());
+                                       return;
+                               }
+                       }
+                       throw new OutOfMemoryError();
+               }
+
+               CrailBuffer buffer = getBufferCache().allocateBuffer();
+               if (buffer == null) {
+                       throw new OutOfMemoryError();
+               }
+               if (buffer.capacity() < lbaDataSize) {
+                       throw new IllegalArgumentException("Slice size smaller 
LBA data size");
+               }
+               while (buffer.remaining() >= lbaDataSize && buffersLeft > 0) {
+                       buffer.limit(buffer.position() + lbaDataSize);
+                       freeBuffers.add(buffer.slice());
+                       buffer.position(buffer.limit());
+                       buffersLeft--;
+               }
+       }
+
+       static class BufferCacheEntry {
+               private final CrailBuffer buffer;
+               private final AtomicInteger pending;
+               private StorageFuture future;
+
+               BufferCacheEntry(CrailBuffer buffer) {
+                       this.buffer = buffer;
+                       this.pending = new AtomicInteger(1);
+               }
+
+               public StorageFuture getFuture() {
+                       return future;
+               }
+
+               public void setFuture(StorageFuture future) {
+                       this.future = future;
+               }
+
+               void put() {
+                       pending.decrementAndGet();
+               }
+
+               boolean tryGet() {
+                       int prevPending;
+                       do {
+                               prevPending = pending.get();
+                               if (prevPending < 0) {
+                                       return false;
+                               }
+                       } while (!pending.compareAndSet(prevPending, 
prevPending + 1));
+                       return true;
+               }
+
+               boolean tryFree() {
+                       return pending.compareAndSet(0, -1);
+               }
+
+               CrailBuffer getBuffer() {
+                       return buffer;
+               }
+
+
+       }
+
+       BufferCacheEntry get(long alignedRemoteAddress) throws Exception {
+               CrailBuffer buffer;
+               do {
+                       buffer = freeBuffers.poll();
+                       if (buffer == null) {
+                               allocateFreeBuffers();
+                       }
+               } while (buffer == null);
+
+               BufferCacheEntry entry = new BufferCacheEntry(buffer);
+               BufferCacheEntry prevEntry = 
remoteAddressMap.putIfAbsent(alignedRemoteAddress, entry);
+               if (prevEntry != null) {
+                       throw new IllegalStateException();
+               }
+               return entry;
+       }
+
+       BufferCacheEntry getExisting(long alignedRemoteAddress) {
+               BufferCacheEntry entry = 
remoteAddressMap.get(alignedRemoteAddress);
+               if (entry != null && !entry.tryGet()) {
+                       entry = null;
+               }
+               return entry;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index b4b1054..e1430af 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015-2018, IBM Corporation
+ * Copyright (C) 2018, IBM Corporation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,208 +19,223 @@
 
 package org.apache.crail.storage.nvmf.client;
 
-import com.ibm.disni.nvmef.NvmeCommand;
-import com.ibm.disni.nvmef.NvmeEndpoint;
-import com.ibm.disni.nvmef.NvmeEndpointGroup;
-import com.ibm.disni.nvmef.spdk.IOCompletion;
-
+import com.ibm.jnvmf.*;
 import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.CrailStatistics;
 import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.memory.BufferCache;
 import org.apache.crail.metadata.BlockInfo;
+import org.apache.crail.metadata.DataNodeInfo;
 import org.apache.crail.storage.StorageEndpoint;
 import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.nvmf.NvmfBufferCache;
 import org.apache.crail.storage.nvmf.NvmfStorageConstants;
 import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.*;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class NvmfStorageEndpoint implements StorageEndpoint {
        private static final Logger LOG = CrailUtils.getLogger();
 
-       private final InetSocketAddress inetSocketAddress;
-       private final NvmeEndpoint endpoint;
-       private final int sectorSize;
-       private final BufferCache cache;
-       private final BlockingQueue<NvmeCommand> freeCommands;
-       private final NvmeCommand[] commands;
-       private final NvmfStorageFuture[] futures;
-       private final ThreadLocal<long[]> completed;
-       private final int ioQeueueSize;
-
-       public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress 
inetSocketAddress) throws IOException {
-               this.inetSocketAddress = inetSocketAddress;
-               endpoint = group.createEndpoint();
+       private final Controller controller;
+       private final IOQueuePair queuePair;
+       private final int lbaDataSize;
+       private final long namespaceCapacity;
+       private final NvmfRegisteredBufferCache registeredBufferCache;
+       private final NvmfStagingBufferCache stagingBufferCache;
+       private final CrailStatistics statistics;
+
+       private final Queue<NvmWriteCommand> writeCommands;
+       private final Queue<NvmReadCommand> readCommands;
+
+       private final AtomicInteger outstandingOperations;
+
+       public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, 
CrailStatistics statistics,
+                                                          CrailBufferCache 
bufferCache) throws IOException {
+               InetSocketAddress inetSocketAddress = new InetSocketAddress(
+                               InetAddress.getByAddress(info.getIpAddress()), 
info.getPort());
+               // XXX FIXME: nsid from datanodeinfo
+               NvmfTransportId transportId = new 
NvmfTransportId(inetSocketAddress,
+                               new 
NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
+               LOG.info("Connecting to NVMf target at " + 
transportId.toString());
+               controller = nvme.connect(transportId);
+               controller.getControllerConfiguration().setEnable(true);
+               controller.syncConfiguration();
                try {
-                       URI url = new URI("nvmef://" + 
inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
-                                       "/0/" + NvmfStorageConstants.NAMESPACE 
+ "?subsystem=nqn.2016-06.io.spdk:cnode1");
-                       LOG.info("Connecting to " + url.toString());
-                       endpoint.connect(url);
-               } catch (URISyntaxException e) {
-                       //FIXME
-                       e.printStackTrace();
+                       controller.waitUntilReady();
+               } catch (TimeoutException e) {
+                       throw new IOException(e);
                }
-               sectorSize = endpoint.getSectorSize();
-               cache = new NvmfBufferCache();
-               ioQeueueSize = endpoint.getIOQueueSize();
-               freeCommands = new 
ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
-               commands = new NvmeCommand[ioQeueueSize];
-               for (int i = 0; i < ioQeueueSize; i++) {
-                       NvmeCommand command = endpoint.newCommand();
-                       command.setId(i);
-                       commands[i] = command;
-                       freeCommands.add(command);
+               IdentifyControllerData identifyControllerData = 
controller.getIdentifyControllerData();
+               if (CrailConstants.SLICE_SIZE > 
identifyControllerData.getMaximumDataTransferSize().toInt()) {
+                       throw new 
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size 
(" +
+                                       
identifyControllerData.getMaximumDataTransferSize() + ")");
                }
-               futures = new NvmfStorageFuture[ioQeueueSize];
-               completed = new ThreadLocal<long[]>() {
-                       public long[] initialValue() {
-                               return new long[ioQeueueSize];
+               List<Namespace> namespaces = controller.getActiveNamespaces();
+               //TODO: poll nsid in datanodeinfo
+               NamespaceIdentifier namespaceIdentifier = new 
NamespaceIdentifier(1);
+               Namespace namespace = null;
+               for (Namespace n : namespaces) {
+                       if (n.getIdentifier().equals(namespaceIdentifier)) {
+                               namespace = n;
+                               break;
                        }
-               };
+               }
+               if (namespace == null) {
+                       throw new IllegalArgumentException("No namespace with 
id " + namespaceIdentifier +
+                                       " at controller " + 
transportId.toString());
+               }
+               IdentifyNamespaceData identifyNamespaceData = 
namespace.getIdentifyNamespaceData();
+               lbaDataSize = 
identifyNamespaceData.getFormattedLBASize().getLBADataSize().toInt();
+               if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
+                       throw new 
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
+                                       " is not a multiple of LBA data size (" 
+ lbaDataSize + ")");
+               }
+               namespaceCapacity = 
identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
+               this.queuePair = 
controller.createIOQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
+                               SubmissionQueueEntry.SIZE);
+
+               this.writeCommands = new 
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+               this.readCommands = new 
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
+               for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
+                       NvmWriteCommand writeCommand = new 
NvmWriteCommand(queuePair);
+                       writeCommand.setSendInline(true);
+                       
writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+                       writeCommands.add(writeCommand);
+                       NvmReadCommand readCommand = new 
NvmReadCommand(queuePair);
+                       readCommand.setSendInline(true);
+                       
readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
+                       readCommands.add(readCommand);
+               }
+               this.registeredBufferCache = new 
NvmfRegisteredBufferCache(queuePair);
+               this.outstandingOperations = new AtomicInteger(0);
+               this.stagingBufferCache = new 
NvmfStagingBufferCache(bufferCache,
+                               NvmfStorageConstants.STAGING_CACHE_SIZE, 
getLBADataSize());
+               this.statistics = statistics;
        }
 
-       public int getSectorSize() {
-               return sectorSize;
+       public int getLBADataSize() {
+               return lbaDataSize;
+       }
+
+       public long getNamespaceCapacity() {
+               return namespaceCapacity;
        }
 
        enum Operation {
                WRITE,
-               READ;
+               READ
        }
 
-       public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo 
remoteMr, long remoteOffset)
-                       throws IOException, InterruptedException {
-               int length = buffer.remaining();
-               if (length > CrailConstants.BLOCK_SIZE){
-                       throw new IOException("write size too large " + length);
-               }
-               if (length <= 0){
-                       throw new IOException("write size too small, len " + 
length);
-               }
-               if (buffer.position() < 0){
-                       throw new IOException("local offset too small " + 
buffer.position());
-               }
-               if (remoteOffset < 0){
-                       throw new IOException("remote offset too small " + 
remoteOffset);
-               }
+       void putOperation() {
+               outstandingOperations.decrementAndGet();
+       }
 
-               if (remoteMr.getAddr() + remoteOffset + length > 
endpoint.getNamespaceSize()){
-                       long tmpAddr = remoteMr.getAddr() + remoteOffset + 
length;
-                       throw new IOException("remote fileOffset + remoteOffset 
+ len = " + tmpAddr + " - size = " +
-                                       endpoint.getNamespaceSize());
+       private boolean tryGetOperation() {
+               int outstandingOperationsOld = outstandingOperations.get();
+               if (outstandingOperationsOld != 
NvmfStorageConstants.QUEUE_SIZE) {
+                       return 
outstandingOperations.compareAndSet(outstandingOperationsOld, 
outstandingOperationsOld + 1);
                }
+               return false;
+       }
 
-//             LOG.info("op = " + op.name() +
-//                             ", position = " + buffer.position() +
-//                             ", localOffset = " + buffer.position() +
-//                             ", remoteOffset = " + remoteOffset +
-//                             ", remoteAddr = " + remoteMr.getAddr() +
-//                             ", length = " + length);
-
-               NvmeCommand command = freeCommands.poll();
-               while (command == null) {
-                       poll();
-                       command = freeCommands.poll();
-               }
+       private static int divCeil(int a, int b) {
+               return (a + b - 1) / b;
+       }
 
-               boolean aligned = 
NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
-                               && 
NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
-               long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, 
remoteOffset, sectorSize);
-               StorageFuture future = null;
-               if (aligned) {
-//                     LOG.info("aligned");
-                       
command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
-                       switch(op) {
-                               case READ:
-                                       command.read();
-                                       break;
-                               case WRITE:
-                                       command.write();
-                                       break;
-                       }
-                       future = futures[(int)command.getId()] = new 
NvmfStorageFuture(this, length);
-                       command.execute();
-               } else {
-//                     LOG.info("unaligned");
-                       long alignedLength = 
NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
+       private int getNumLogicalBlocks(CrailBuffer buffer) {
+               return divCeil(buffer.remaining(), getLBADataSize());
+       }
 
-                       CrailBuffer stagingBuffer = cache.allocateBuffer();
-                       stagingBuffer.limit((int)alignedLength);
+       StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, 
long remoteOffset) throws InterruptedException, IOException {
+               assert blockInfo.getAddr() + remoteOffset + buffer.remaining() 
<= getNamespaceCapacity();
+               assert remoteOffset >= 0;
+               assert buffer.remaining() <= CrailConstants.BLOCK_SIZE;
+
+               long startingAddress = blockInfo.getAddr() + remoteOffset;
+               if (startingAddress % getLBADataSize() != 0 ||
+                               ((startingAddress + buffer.remaining()) % 
getLBADataSize() != 0 && op == Operation.WRITE)) {
+                       if (op == Operation.READ) {
+                               throw new IOException("Unaligned read access is 
not supported. Address (" + startingAddress +
+                                               ") needs to be multiple of LBA 
data size " + getLBADataSize());
+                       }
                        try {
-                               switch(op) {
-                                       case READ: {
-                                               NvmfStorageFuture f = 
futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength);
-                                               
command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
-                                               future = new 
NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset, 
stagingBuffer);
-                                               break;
-                                       }
-                                       case WRITE: {
-                                               if 
(NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) {
-                                                       // Do not read if the 
offset is aligned to sector size
-                                                       int sizeToWrite = 
length;
-                                                       
stagingBuffer.put(buffer.getByteBuffer());
-                                                       
stagingBuffer.position(0);
-                                                       
command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute();
-                                                       future = 
futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this, 
sizeToWrite, stagingBuffer);
-                                               } else {
-                                                       // RMW but append only 
file system allows only reading last sector
-                                                       // and dir entries are 
sector aligned
-                                                       
stagingBuffer.limit(sectorSize);
-                                                       NvmfStorageFuture f = 
futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize);
-                                                       
command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute();
-                                                       future = new 
NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset, 
stagingBuffer);
-                                               }
-                                               break;
-                                       }
-                               }
-                       } catch (NoSuchFieldException e) {
-                               throw new IOException(e);
-                       } catch (IllegalAccessException e) {
+                               return new NvmfUnalignedWriteFuture(this, 
buffer, blockInfo, remoteOffset);
+                       } catch (Exception e) {
                                throw new IOException(e);
                        }
                }
 
+               if (!tryGetOperation()) {
+                       do {
+                               poll();
+                       } while (!tryGetOperation());
+               }
+
+               NvmIOCommand<? extends NvmIOCommandCapsule> command;
+               NvmfFuture<?> future;
+               Response<NvmResponseCapsule> response;
+               if (op == Operation.READ) {
+                       NvmReadCommand readCommand = readCommands.remove();
+                       response = readCommand.newResponse();
+                       future = new NvmfFuture<>(this, readCommand, response, 
readCommands, buffer.remaining());
+                       command = readCommand;
+               } else {
+                       NvmWriteCommand writeCommand = writeCommands.remove();
+                       response = writeCommand.newResponse();
+                       future = new NvmfFuture<>(this, writeCommand, response, 
writeCommands, buffer.remaining());
+                       command = writeCommand;
+               }
+               command.setCallback(future);
+               response.setCallback(future);
+
+               NvmIOCommandSQE sqe = 
command.getCommandCapsule().getSubmissionQueueEntry();
+               long startingLBA = startingAddress / getLBADataSize();
+               sqe.setStartingLBA(startingLBA);
+               /* TODO: on read this potentially overwrites data beyond the 
set limit */
+               short numLogicalBlocks = (short)(getNumLogicalBlocks(buffer) - 
1);
+               sqe.setNumberOfLogicalBlocks(numLogicalBlocks);
+               KeyedNativeBuffer registeredBuffer = 
registeredBufferCache.get(buffer);
+               registeredBuffer.position(buffer.position());
+               registeredBuffer.limit(registeredBuffer.position() + 
(numLogicalBlocks + 1) * getLBADataSize());
+               command.getCommandCapsule().setSGLDescriptor(registeredBuffer);
+
+               command.execute(response);
+
                return future;
        }
 
-       public StorageFuture write(CrailBuffer buffer, BlockInfo blockInfo, 
long remoteOffset)
-                       throws IOException, InterruptedException {
+       public StorageFuture write(CrailBuffer buffer, BlockInfo blockInfo, 
long remoteOffset) throws InterruptedException, IOException {
                return Op(Operation.WRITE, buffer, blockInfo, remoteOffset);
        }
 
-       public StorageFuture read(CrailBuffer buffer, BlockInfo blockInfo, long 
remoteOffset)
-                       throws IOException, InterruptedException {
+       public StorageFuture read(CrailBuffer buffer, BlockInfo blockInfo, long 
remoteOffset) throws InterruptedException, IOException {
                return Op(Operation.READ, buffer, blockInfo, remoteOffset);
        }
 
        void poll() throws IOException {
-               long[] ca = completed.get();
-               int numberCompletions = endpoint.processCompletions(ca);
-               for (int i = 0; i < numberCompletions; i++) {
-                       int idx = (int)ca[i];
-                       NvmeCommand command = commands[idx];
-                       IOCompletion completion = command.getCompletion();
-                       completion.done();
-                       futures[idx].signal(completion.getStatusCodeType(), 
completion.getStatusCode());
-                       freeCommands.add(command);
-               }
-       }
-
-       void putBuffer(CrailBuffer buffer) throws IOException {
-               cache.freeBuffer(buffer);
+               queuePair.poll();
        }
 
        public void close() throws IOException, InterruptedException {
-               endpoint.close();
+               registeredBufferCache.free();
+               controller.free();
+
        }
 
        public boolean isLocal() {
                return false;
        }
+
+       NvmfStagingBufferCache getStagingBufferCache() {
+               return stagingBufferCache;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
deleted file mode 100644
index 9dbab36..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import com.ibm.disni.nvmef.spdk.NvmeGenericCommandStatusCode;
-import com.ibm.disni.nvmef.spdk.NvmeStatusCodeType;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.StorageResult;
-import org.apache.crail.storage.nvmf.NvmfStorageConstants;
-
-public class NvmfStorageFuture implements StorageFuture, StorageResult {
-
-       protected final NvmfStorageEndpoint endpoint;
-       private final int len;
-       private Exception exception;
-       private volatile boolean done;
-
-       public NvmfStorageFuture(NvmfStorageEndpoint endpoint, int len) {
-               this.endpoint = endpoint;
-               this.len = len;
-       }
-
-       public int getLen() {
-               return len;
-       }
-
-       public boolean cancel(boolean b) {
-               return false;
-       }
-
-       public boolean isCancelled() {
-               return false;
-       }
-
-       void signal(NvmeStatusCodeType statusCodeType, int statusCode) {
-               if (statusCodeType != NvmeStatusCodeType.GENERIC &&
-                               statusCode != 
NvmeGenericCommandStatusCode.SUCCESS.getNumVal()) {
-                       exception = new ExecutionException("Error: " + 
statusCodeType.name() + " - " + statusCode) {};
-               }
-               done = true;
-       }
-
-       public boolean isDone() {
-               if (!done) {
-                       try {
-                               endpoint.poll();
-                       } catch (IOException e) {
-                               exception = e;
-                       }
-               }
-               return done;
-       }
-
-       public StorageResult get() throws InterruptedException, 
ExecutionException {
-               try {
-                       return get(NvmfStorageConstants.TIME_OUT, 
NvmfStorageConstants.TIME_UNIT);
-               } catch (TimeoutException e) {
-                       throw new ExecutionException(e);
-               }
-       }
-
-       public StorageResult get(long timeout, TimeUnit timeUnit) throws 
InterruptedException, ExecutionException, TimeoutException {
-               if (exception != null) {
-                       throw new ExecutionException(exception);
-               }
-               if (!done) {
-                       long start = System.nanoTime();
-                       long end = start + 
TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
-                       boolean waitTimeOut;
-                       do {
-                               try {
-                                       endpoint.poll();
-                               } catch (IOException e) {
-                                       throw new ExecutionException(e);
-                               }
-                               // we don't want to trigger timeout on first 
iteration
-                               waitTimeOut = System.nanoTime() > end;
-                       } while (!done && !waitTimeOut);
-                       if (!done && waitTimeOut) {
-                               throw new TimeoutException("get wait time 
out!");
-                       }
-                       if (exception != null) {
-                               throw new ExecutionException(exception);
-                       }
-               }
-               return this;
-       }
-
-       @Override
-       public boolean isSynchronous() {
-               return false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
deleted file mode 100644
index 93d24d0..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import sun.misc.Unsafe;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.storage.StorageFuture;
-import org.apache.crail.storage.StorageResult;
-import org.apache.crail.storage.nvmf.NvmfStorageConstants;
-
-public abstract class NvmfStorageUnalignedFuture implements StorageFuture, 
StorageResult  {
-       protected final NvmfStorageFuture initFuture;
-       protected final NvmfStorageEndpoint endpoint;
-       protected final CrailBuffer buffer;
-       protected final long localOffset;
-       protected final BlockInfo remoteMr;
-       protected final long remoteOffset;
-       protected final int len;
-       protected final CrailBuffer stagingBuffer;
-       protected boolean done;
-       protected Exception exception;
-       protected static Unsafe unsafe;
-
-       public NvmfStorageUnalignedFuture(NvmfStorageFuture future, 
NvmfStorageEndpoint endpoint, CrailBuffer buffer,
-                                                                  BlockInfo 
remoteMr, long remoteOffset, CrailBuffer stagingBuffer)
-                       throws NoSuchFieldException, IllegalAccessException {
-               this.initFuture = future;
-               this.endpoint = endpoint;
-               this.buffer = buffer;
-               this.localOffset = buffer.position();
-               this.remoteMr = remoteMr;
-               this.remoteOffset = remoteOffset;
-               this.len = buffer.remaining();
-               this.stagingBuffer = stagingBuffer;
-               initUnsafe();
-               done = false;
-       }
-
-       public boolean isDone() {
-               if (!done) {
-                       try {
-                               get(0, TimeUnit.NANOSECONDS);
-                       } catch (InterruptedException e) {
-                               exception = e;
-                       } catch (ExecutionException e) {
-                               exception = e;
-                       } catch (TimeoutException e) {
-                               // i.e. operation is not finished
-                       }
-               }
-               return done;
-       }
-
-       public int getLen() {
-               return len;
-       }
-
-       public boolean cancel(boolean b) {
-               return false;
-       }
-
-       public boolean isCancelled() {
-               return false;
-       }
-
-       public StorageResult get() throws InterruptedException, 
ExecutionException {
-               try {
-                       return get(NvmfStorageConstants.TIME_OUT, 
NvmfStorageConstants.TIME_UNIT);
-               } catch (TimeoutException e) {
-                       throw new ExecutionException(e);
-               }
-       }
-
-       private static void initUnsafe() throws NoSuchFieldException, 
IllegalAccessException {
-               if (unsafe == null) {
-                       Field theUnsafe = 
Unsafe.class.getDeclaredField("theUnsafe");
-                       theUnsafe.setAccessible(true);
-                       unsafe = (Unsafe) theUnsafe.get(null);
-               }
-       }
-
-       @Override
-       public boolean isSynchronous() {
-               return false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
deleted file mode 100644
index b0a5ae9..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.storage.StorageResult;
-
-public class NvmfStorageUnalignedRMWFuture extends NvmfStorageUnalignedFuture {
-
-       private boolean initDone;
-       private Future<StorageResult> writeFuture;
-
-       public NvmfStorageUnalignedRMWFuture(NvmfStorageFuture future, 
NvmfStorageEndpoint endpoint, CrailBuffer buffer,
-                                                                         
BlockInfo remoteMr, long remoteOffset, CrailBuffer stagingBuffer)
-                       throws NoSuchFieldException, IllegalAccessException {
-               super(future, endpoint, buffer, remoteMr, remoteOffset, 
stagingBuffer);
-               initDone = false;
-       }
-
-       public StorageResult get(long l, TimeUnit timeUnit) throws 
InterruptedException, ExecutionException, TimeoutException {
-               if (exception != null) {
-                       throw new ExecutionException(exception);
-               }
-               if (!done) {
-                       if (!initDone) {
-                               initFuture.get(l, timeUnit);
-                               long srcAddr = buffer.address() + localOffset;
-                               long dstAddr = stagingBuffer.address() + 
NvmfStorageUtils.namespaceSectorOffset(
-                                               endpoint.getSectorSize(), 
remoteOffset);
-                               unsafe.copyMemory(srcAddr, dstAddr, len);
-
-                               stagingBuffer.clear();
-                               int alignedLen = (int) 
NvmfStorageUtils.alignLength(endpoint.getSectorSize(), remoteOffset, len);
-                               stagingBuffer.limit(alignedLen);
-                               try {
-                                       writeFuture = 
endpoint.write(stagingBuffer, remoteMr, 
NvmfStorageUtils.alignOffset(endpoint.getSectorSize(), remoteOffset));
-                               } catch (IOException e) {
-                                       throw new ExecutionException(e);
-                               }
-                               initDone =true;
-                       }
-                       writeFuture.get(l, timeUnit);
-                       try {
-                               endpoint.putBuffer(stagingBuffer);
-                       } catch (IOException e) {
-                               throw new ExecutionException(e);
-                       }
-                       done = true;
-               }
-               return this;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
deleted file mode 100644
index 61f460e..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.crail.CrailBuffer;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.storage.StorageResult;
-
-public class NvmfStorageUnalignedReadFuture extends NvmfStorageUnalignedFuture 
{
-
-       public NvmfStorageUnalignedReadFuture(NvmfStorageFuture future, 
NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo remoteMr,
-                                                                          long 
remoteOffset, CrailBuffer stagingBuffer)
-                       throws NoSuchFieldException, IllegalAccessException {
-               super(future, endpoint, buffer, remoteMr, remoteOffset, 
stagingBuffer);
-       }
-
-       public StorageResult get(long l, TimeUnit timeUnit) throws 
InterruptedException, ExecutionException, TimeoutException {
-               if (exception != null) {
-                       throw new ExecutionException(exception);
-               }
-               if (!done) {
-                       initFuture.get(l, timeUnit);
-                       long srcAddr = stagingBuffer.address() + 
-                                       
NvmfStorageUtils.namespaceSectorOffset(endpoint.getSectorSize(), remoteOffset);
-                       long dstAddr = buffer.address() + localOffset;
-                       unsafe.copyMemory(srcAddr, dstAddr, len);
-                       done = true;
-                       try {
-                               endpoint.putBuffer(stagingBuffer);
-                       } catch (IOException e) {
-                               throw new ExecutionException(e);
-                       }
-               }
-               return this;
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
deleted file mode 100644
index a842033..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import com.ibm.disni.nvmef.spdk.NvmeStatusCodeType;
-
-import java.io.IOException;
-
-import org.apache.crail.CrailBuffer;
-
-/**
- * Created by jpf on 23.05.17.
- */
-public class NvmfStorageUnalignedWriteFuture extends NvmfStorageFuture {
-
-       private CrailBuffer stagingBuffer;
-
-       public NvmfStorageUnalignedWriteFuture(NvmfStorageEndpoint endpoint, 
int len, CrailBuffer stagingBuffer) {
-               super(endpoint, len);
-               this.stagingBuffer = stagingBuffer;
-       }
-
-       @Override
-       void signal(NvmeStatusCodeType statusCodeType, int statusCode) {
-               try {
-                       endpoint.putBuffer(stagingBuffer);
-               } catch (IOException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-               }
-               super.signal(statusCodeType, statusCode);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
deleted file mode 100644
index 562e495..0000000
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015-2018, IBM Corporation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.crail.storage.nvmf.client;
-
-import org.apache.crail.metadata.BlockInfo;
-
-/**
- * Created by jpf on 14.02.17.
- */
-public class NvmfStorageUtils {
-
-       public static long linearBlockAddress(BlockInfo remoteMr, long 
remoteOffset, int sectorSize) {
-               return (remoteMr.getAddr() + remoteOffset) / (long)sectorSize;
-       }
-
-       public static long namespaceSectorOffset(int sectorSize, long 
fileOffset) {
-               return fileOffset % (long)sectorSize;
-       }
-
-       public static long alignLength(int sectorSize, long remoteOffset, long 
len) {
-               long alignedSize = len + namespaceSectorOffset(sectorSize, 
remoteOffset);
-               if (namespaceSectorOffset(sectorSize, alignedSize) != 0) {
-                       alignedSize += (long)sectorSize - 
namespaceSectorOffset(sectorSize, alignedSize);
-               }
-               return alignedSize;
-       }
-
-       public static long alignOffset(int sectorSize, long fileOffset) {
-               return fileOffset - namespaceSectorOffset(sectorSize, 
fileOffset);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
new file mode 100644
index 0000000..08eab5b
--- /dev/null
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright (C) 2018, IBM Corporation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crail.storage.nvmf.client;
+
+import org.apache.crail.CrailBuffer;
+import org.apache.crail.metadata.BlockInfo;
+import org.apache.crail.storage.StorageFuture;
+import org.apache.crail.storage.StorageResult;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public class NvmfUnalignedWriteFuture implements StorageFuture {
+       private final NvmfStorageEndpoint endpoint;
+       private StorageFuture beginFuture;
+       private StorageFuture middleFuture;
+       private StorageFuture endFuture;
+       private final int written;
+       private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
+       private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
+
+       private final boolean isSectorAligned(long address) {
+               return address % endpoint.getLBADataSize() == 0;
+       }
+
+       private final long floorToSectorSize(long address) {
+               return address - (address % endpoint.getLBADataSize());
+       }
+
+       private final int leftInSector(long address) {
+               return endpoint.getLBADataSize() - offsetInSector(address);
+       }
+
+       private final int offsetInSector(long address) {
+               return (int)(address % endpoint.getLBADataSize());
+       }
+
+       NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer 
buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
+               this.endpoint = endpoint;
+               this.written = buffer.remaining();
+               /* assume blockInfo.getAddr() is sector aligned */
+               assert isSectorAligned(blockInfo.getAddr());
+
+               long nextRemoteOffset = remoteOffset;
+               /* beginning */
+               if (!isSectorAligned(remoteOffset)) {
+                       int copySize = Math.min(leftInSector(remoteOffset), 
buffer.remaining());
+                       nextRemoteOffset = remoteOffset + copySize;
+                       int oldLimit = buffer.limit();
+                       buffer.limit(buffer.position() + copySize);
+                       long alignedRemoteOffset = 
floorToSectorSize(remoteOffset);
+                       long alignedRemoteAddress = blockInfo.getAddr() + 
alignedRemoteOffset;
+                       beginBuffer = 
endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
+                       if (beginBuffer == null) {
+                               /* we had to delete the old buffer because we 
ran out of space. This should happen rarely. */
+                               beginBuffer = 
endpoint.getStagingBufferCache().get(alignedRemoteAddress);
+                               endpoint.read(beginBuffer.getBuffer(), 
blockInfo, alignedRemoteOffset).get();
+                       } else {
+                               /* Wait for previous end operation to finish */
+                               beginBuffer.getFuture().get();
+                       }
+                       CrailBuffer stagingBuffer = beginBuffer.getBuffer();
+                       stagingBuffer.position(offsetInSector(remoteOffset));
+                       
stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
+                       buffer.limit(oldLimit);
+                       stagingBuffer.position(0);
+                       beginFuture = endpoint.write(stagingBuffer, blockInfo, 
alignedRemoteOffset);
+                       beginBuffer.setFuture(beginFuture);
+                       stagingBuffer.position(offsetInSector(remoteOffset));
+               }
+
+               /* middle */
+               if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= 
endpoint.getLBADataSize()) {
+                       int oldLimit = buffer.limit();
+                       buffer.limit(buffer.position() + 
(int)floorToSectorSize(buffer.remaining()));
+                       int toWrite = buffer.remaining();
+                       middleFuture = endpoint.write(buffer, blockInfo, 
nextRemoteOffset);
+                       nextRemoteOffset += toWrite;
+                       buffer.position(buffer.limit());
+                       buffer.limit(oldLimit);
+               }
+
+               /* end */
+               if (buffer.remaining() > 0) {
+                       endBuffer = 
endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
+                       CrailBuffer stagingBuffer = endBuffer.getBuffer();
+                       stagingBuffer.position(0);
+                       
stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
+                       stagingBuffer.position(0);
+                       endFuture = endpoint.write(stagingBuffer, blockInfo, 
nextRemoteOffset);
+                       endBuffer.setFuture(endFuture);
+               }
+       }
+
+       @Override
+       public boolean isSynchronous() {
+               return false;
+       }
+
+       @Override
+       public boolean cancel(boolean b) {
+               return false;
+       }
+
+       @Override
+       public boolean isCancelled() {
+               return false;
+       }
+
+       private static boolean checkIfFutureIsDone(StorageFuture future) {
+               return (future != null && future.isDone()) || future == null;
+       }
+
+       @Override
+       public boolean isDone() {
+               if (beginFuture != null && beginFuture.isDone()) {
+                       if (beginBuffer != null) {
+                               beginBuffer.put();
+                               beginBuffer = null;
+                       }
+               }
+               if (endFuture != null && endFuture.isDone()) {
+                       if (endBuffer != null) {
+                               endBuffer.put();
+                               endBuffer = null;
+                       }
+               }
+               return beginBuffer == null && checkIfFutureIsDone(middleFuture) 
&& endBuffer == null;
+       }
+
+       @Override
+       public StorageResult get() throws InterruptedException, 
ExecutionException {
+               try {
+                       return get(2, TimeUnit.MINUTES);
+               } catch (TimeoutException e) {
+                       throw new ExecutionException(e);
+               }
+       }
+
+       @Override
+       public StorageResult get(long timeout, TimeUnit timeUnit) throws 
InterruptedException, ExecutionException, TimeoutException {
+               if (!isDone()) {
+                       long start = System.nanoTime();
+                       long end = start + 
TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
+                       boolean waitTimeOut;
+                       do {
+                               waitTimeOut = System.nanoTime() > end;
+                       } while (!isDone() && !waitTimeOut);
+                       if (!isDone() && waitTimeOut) {
+                               throw new TimeoutException("poll wait time 
out!");
+                       }
+               }
+               if (beginFuture != null) {
+                       beginFuture.get();
+               }
+               if (middleFuture != null) {
+                       middleFuture.get();
+               }
+               if (endFuture != null) {
+                       endFuture.get();
+               }
+               return () -> written;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
 
b/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
new file mode 100644
index 0000000..9d4d7bf
--- /dev/null
+++ 
b/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java
@@ -0,0 +1,73 @@
+package org.apache.crail.storage.nvmf.client;
+
+import org.apache.crail.CrailBuffer;
+import org.apache.crail.CrailBufferCache;
+import org.apache.crail.conf.CrailConfiguration;
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.memory.MappedBufferCache;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class NvmfStagingBufferCacheTest {
+
+       @BeforeClass
+       public static void init() throws IOException {
+               CrailConstants.updateConstants(new CrailConfiguration());
+       }
+
+       private static CrailBufferCache bufferCache;
+       static CrailBufferCache getBufferCache() throws IOException {
+               if (bufferCache == null) {
+                       bufferCache = new MappedBufferCache();
+               }
+               return bufferCache;
+       }
+
+
+       @Test(expected = IllegalArgumentException.class)
+       public void createBufferCache() throws IOException {
+               new NvmfStagingBufferCache(getBufferCache(), -1, 512);
+               new NvmfStagingBufferCache(getBufferCache(),0, 512);
+               new NvmfStagingBufferCache(getBufferCache(),1024, -1);
+               new NvmfStagingBufferCache(getBufferCache(),1024, 0);
+       }
+
+       @Test(expected = OutOfMemoryError.class)
+       public void outOfMemory() throws Exception {
+               NvmfStagingBufferCache bufferCache = new 
NvmfStagingBufferCache(getBufferCache(),1, 512);
+               NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry = 
bufferCache.get(0);
+               NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry2 = 
bufferCache.get(1);
+       }
+
+       @Test
+       public void bufferExists() throws Exception {
+               NvmfStagingBufferCache bufferCache = new 
NvmfStagingBufferCache(getBufferCache(),1, 512);
+               NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry = 
bufferCache.get(0);
+               NvmfStagingBufferCache.BufferCacheEntry 
existingBufferCacheEntry = bufferCache.getExisting(0);
+               assertEquals(bufferCacheEntry, existingBufferCacheEntry);
+       }
+
+       @Test
+       public void recycleBuffers() throws Exception {
+               NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry[] = 
new NvmfStagingBufferCache.BufferCacheEntry[5];
+               Set<CrailBuffer> buffers = new HashSet<>();
+               NvmfStagingBufferCache bufferCache = new 
NvmfStagingBufferCache(getBufferCache(), bufferCacheEntry.length, 512);
+               for (int i = 0; i < bufferCacheEntry.length; i++) {
+                       bufferCacheEntry[i] = bufferCache.get(i);
+                       buffers.add(bufferCacheEntry[i].getBuffer());
+                       bufferCacheEntry[i].put();
+               }
+               for (int i = 0; i < bufferCacheEntry.length; i++) {
+                       bufferCacheEntry[i] = bufferCache.get(i + 
bufferCacheEntry.length);
+                       
assertTrue(buffers.remove(bufferCacheEntry[i].getBuffer()));
+               }
+       }
+}
\ No newline at end of file

Reply via email to