Repository: incubator-crail Updated Branches: refs/heads/master c053477a7 -> dc50fc193
New NVMf storage tier: use jNVMf library New NVMf storage tier implementation which uses jNVMf library instead of SPDK. We do not implement unaligned reads anymore since we believe the semantics of the underlying storage system should not be hidden like this. We guarantee good performance when using buffered streams since they now try to align accesses whenever possible. https://issues.apache.org/jira/browse/CRAIL-22 Close #16 Signed-off-by: Jonas Pfefferle <peppe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/29be91d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/29be91d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/29be91d2 Branch: refs/heads/master Commit: 29be91d2f00e779fb5dc37575db9e5e763ea6db0 Parents: c053477 Author: Jonas Pfefferle <peppe...@apache.org> Authored: Thu Mar 1 17:29:34 2018 +0100 Committer: Jonas Pfefferle <peppe...@apache.org> Committed: Tue Apr 10 13:18:19 2018 +0200 ---------------------------------------------------------------------- storage-nvmf/pom.xml | 5 +- .../crail/storage/nvmf/NvmfBufferCache.java | 69 ---- .../crail/storage/nvmf/NvmfStorageClient.java | 43 ++- .../storage/nvmf/NvmfStorageConstants.java | 73 ++--- .../crail/storage/nvmf/NvmfStorageServer.java | 66 ++-- .../crail/storage/nvmf/NvmfStorageTier.java | 6 +- .../crail/storage/nvmf/client/NvmfFuture.java | 149 +++++++++ .../nvmf/client/NvmfRegisteredBufferCache.java | 105 +++++++ .../nvmf/client/NvmfStagingBufferCache.java | 159 ++++++++++ .../nvmf/client/NvmfStorageEndpoint.java | 315 ++++++++++--------- .../storage/nvmf/client/NvmfStorageFuture.java | 116 ------- .../nvmf/client/NvmfStorageUnalignedFuture.java | 110 ------- .../client/NvmfStorageUnalignedRMWFuture.java | 76 ----- .../client/NvmfStorageUnalignedReadFuture.java | 60 ---- .../client/NvmfStorageUnalignedWriteFuture.java | 50 --- .../storage/nvmf/client/NvmfStorageUtils.java | 48 --- .../nvmf/client/NvmfUnalignedWriteFuture.java | 183 +++++++++++ .../nvmf/client/NvmfStagingBufferCacheTest.java | 73 +++++ 18 files changed, 941 insertions(+), 765 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/pom.xml ---------------------------------------------------------------------- diff --git a/storage-nvmf/pom.xml b/storage-nvmf/pom.xml index 6500632..20ad294 100644 --- a/storage-nvmf/pom.xml +++ b/storage-nvmf/pom.xml @@ -22,8 +22,9 @@ <artifactId>junit</artifactId> </dependency> <dependency> - <groupId>com.ibm.disni</groupId> - <artifactId>disni</artifactId> + <groupId>com.ibm.jnvmf</groupId> + <artifactId>jnvmf</artifactId> + <version>1.0</version> </dependency> <dependency> <groupId>log4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java deleted file mode 100644 index 19142ec..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfBufferCache.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf; - -import com.ibm.disni.nvmef.NvmeEndpointGroup; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.crail.CrailBuffer; -import org.apache.crail.conf.CrailConstants; -import org.apache.crail.memory.BufferCache; -import org.apache.crail.memory.OffHeapBuffer; - - -public class NvmfBufferCache extends BufferCache { - private static final int ALIGNMENT = 4096; - private NvmeEndpointGroup endpointGroup; - - private List<ByteBuffer> bufferPool = new ArrayList<>(); - - public NvmfBufferCache() throws IOException { - super(); - endpointGroup = NvmfStorageTier.getEndpointGroup(); - if (endpointGroup == null) { - throw new IOException("NvmfStorageTier not initialized"); - } - } - - @Override - public String providerName() { - return "NvmfBufferCache"; - } - - @Override - public CrailBuffer allocateRegion() throws IOException { - ByteBuffer buffer = endpointGroup.allocateBuffer(CrailConstants.BUFFER_SIZE, ALIGNMENT); - bufferPool.add(buffer); - return OffHeapBuffer.wrap(buffer); - } - - @Override - public void close() { - super.close(); - for (ByteBuffer buffer : bufferPool) { - endpointGroup.freeBuffer(buffer); - } - bufferPool.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java index d270f5d..d9dd976 100644 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018, IBM Corporation + * Copyright (C) 2018, IBM Corporation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,8 +19,7 @@ package org.apache.crail.storage.nvmf; -import java.io.IOException; - +import com.ibm.jnvmf.Nvme; import org.apache.crail.CrailBufferCache; import org.apache.crail.CrailStatistics; import org.apache.crail.conf.CrailConfiguration; @@ -31,13 +30,23 @@ import org.apache.crail.storage.nvmf.client.NvmfStorageEndpoint; import org.apache.crail.utils.CrailUtils; import org.slf4j.Logger; -import com.ibm.disni.nvmef.NvmeEndpointGroup; -import com.ibm.disni.nvmef.spdk.NvmeTransportType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; public class NvmfStorageClient implements StorageClient { private static final Logger LOG = CrailUtils.getLogger(); - private static NvmeEndpointGroup clientGroup; - private boolean initialized = false; + private static Nvme nvme; + private boolean initialized; + private List<StorageEndpoint> endpoints; + private CrailStatistics statistics; + private CrailBufferCache bufferCache; + + public NvmfStorageClient() { + this.initialized = false; + this.endpoints = new CopyOnWriteArrayList<>(); + } public void init(CrailStatistics statistics, CrailBufferCache bufferCache, CrailConfiguration crailConfiguration, String[] args) throws IOException { @@ -45,7 +54,9 @@ public class NvmfStorageClient implements StorageClient { throw new IOException("NvmfStorageTier already initialized"); } initialized = true; - + this.statistics = statistics; + this.bufferCache = bufferCache; + LOG.info("Initialize Nvmf storage client"); NvmfStorageConstants.parseCmdLine(crailConfiguration, args); } @@ -53,19 +64,23 @@ public class NvmfStorageClient implements StorageClient { NvmfStorageConstants.printConf(logger); } - public static NvmeEndpointGroup getEndpointGroup() { - if (clientGroup == null) { - clientGroup = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA}, - NvmfStorageConstants.CLIENT_MEMPOOL); + public static Nvme getEndpointGroup() throws UnknownHostException { + if (nvme == null) { + nvme = new Nvme(); } - return clientGroup; + return nvme; } public synchronized StorageEndpoint createEndpoint(DataNodeInfo info) throws IOException { - return new NvmfStorageEndpoint(getEndpointGroup(), CrailUtils.datanodeInfo2SocketAddr(info)); + StorageEndpoint endpoint = new NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache); + endpoints.add(endpoint); + return endpoint; } public void close() throws Exception { + for (StorageEndpoint endpoint : endpoints) { + endpoint.close(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java index d65e398..8ce132b 100644 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageConstants.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018, IBM Corporation + * Copyright (C) 2018, IBM Corporation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,13 +19,9 @@ package org.apache.crail.storage.nvmf; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import com.ibm.jnvmf.NamespaceIdentifier; +import com.ibm.jnvmf.NvmeQualifiedName; +import org.apache.commons.cli.*; import org.apache.crail.conf.CrailConfiguration; import org.apache.crail.conf.CrailConstants; import org.slf4j.Logger; @@ -33,7 +29,6 @@ import org.slf4j.Logger; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; public class NvmfStorageConstants { @@ -46,22 +41,19 @@ public class NvmfStorageConstants { public static int PORT = 50025; public static final String NQN_KEY = "nqn"; - public static String NQN = "nqn.2016-06.io.spdk:cnode1"; + public static NvmeQualifiedName NQN = new NvmeQualifiedName("nqn.2017-06.io.crail:cnode"); - public static final String NAMESPACE_KEY = "namespace"; - public static int NAMESPACE = 1; + /* this is a server property, the client will get the nsid from the namenode */ + public static NamespaceIdentifier NAMESPACE = new NamespaceIdentifier(1); public static final String ALLOCATION_SIZE_KEY = "allocationsize"; public static long ALLOCATION_SIZE = 1073741824; /* 1GB */ - public static final String SERVER_MEMPOOL_KEY = "servermempool"; - public static long SERVER_MEMPOOL = 256; + public static final String QUEUE_SIZE_KEY = "queueSize"; + public static int QUEUE_SIZE = 64; - public static final String CLIENT_MEMPOOL_KEY = "clientmempool"; - public static long CLIENT_MEMPOOL = 256; - - public static final TimeUnit TIME_UNIT = TimeUnit.MINUTES; - public static final long TIME_OUT = 15; + public static final String STAGING_CACHE_SIZE_KEY = "stagingcachesize"; + public static int STAGING_CACHE_SIZE = 262144; private static String fullKey(String key) { return PREFIX + "." + key; @@ -72,12 +64,7 @@ public class NvmfStorageConstants { } public static void updateConstants(CrailConfiguration conf) throws UnknownHostException { - String arg = get(conf, NAMESPACE_KEY); - if (arg != null) { - NAMESPACE = Integer.parseInt(arg); - } - - arg = get(conf, IP_ADDR_KEY); + String arg = get(conf, IP_ADDR_KEY); if (arg != null) { IP_ADDR = InetAddress.getByName(arg); } @@ -89,7 +76,7 @@ public class NvmfStorageConstants { arg = get(conf, NQN_KEY); if (arg != null) { - NQN = arg; + NQN = new NvmeQualifiedName(arg); } arg = get(conf, ALLOCATION_SIZE_KEY); @@ -97,23 +84,23 @@ public class NvmfStorageConstants { ALLOCATION_SIZE = Long.parseLong(arg); } - arg = get(conf, SERVER_MEMPOOL_KEY); + arg = get(conf, QUEUE_SIZE_KEY); if (arg != null) { - SERVER_MEMPOOL = Long.parseLong(arg); + QUEUE_SIZE = Integer.parseInt(arg); } - arg = get(conf, CLIENT_MEMPOOL_KEY); + arg = get(conf, STAGING_CACHE_SIZE_KEY); if (arg != null) { - CLIENT_MEMPOOL = Long.parseLong(arg); + STAGING_CACHE_SIZE = Integer.parseInt(arg); } } - public static void verify() throws IOException { - if (NAMESPACE <= 0){ - throw new IOException("Namespace must be > 0"); - } + public static void verify() { if (ALLOCATION_SIZE % CrailConstants.BLOCK_SIZE != 0){ - throw new IOException("allocationsize must be multiple of crail.blocksize"); + throw new IllegalArgumentException("allocationsize must be multiple of crail.blocksize"); + } + if (QUEUE_SIZE < 0) { + throw new IllegalArgumentException("Queue size negative"); } } @@ -123,12 +110,10 @@ public class NvmfStorageConstants { } logger.info(fullKey(PORT_KEY) + " " + PORT); logger.info(fullKey(NQN_KEY) + " " + NQN); - logger.info(fullKey(NAMESPACE_KEY) + " " + NAMESPACE); logger.info(fullKey(ALLOCATION_SIZE_KEY) + " " + ALLOCATION_SIZE); - logger.info(fullKey(SERVER_MEMPOOL_KEY) + " " + SERVER_MEMPOOL); - logger.info(fullKey(CLIENT_MEMPOOL_KEY) + " " + CLIENT_MEMPOOL); + logger.info(fullKey(QUEUE_SIZE_KEY) + " " + QUEUE_SIZE); } - + public static void parseCmdLine(CrailConfiguration crailConfiguration, String[] args) throws IOException { NvmfStorageConstants.updateConstants(crailConfiguration); @@ -140,10 +125,12 @@ public class NvmfStorageConstants { bindIp.setRequired(true); } Option port = Option.builder("p").desc("target port").hasArg().type(Number.class).build(); + Option namespace = Option.builder("n").desc("namespace id").hasArg().type(Number.class).build(); Option nqn = Option.builder("nqn").desc("target subsystem NQN").hasArg().build(); options.addOption(bindIp); options.addOption(port); options.addOption(nqn); + options.addOption(namespace); CommandLineParser parser = new DefaultParser(); HelpFormatter formatter = new HelpFormatter(); CommandLine line = null; @@ -152,6 +139,10 @@ public class NvmfStorageConstants { if (line.hasOption(port.getOpt())) { NvmfStorageConstants.PORT = ((Number) line.getParsedOptionValue(port.getOpt())).intValue(); } + if (line.hasOption(namespace.getOpt())) { + NvmfStorageConstants.NAMESPACE = new + NamespaceIdentifier(((Number) line.getParsedOptionValue(namespace.getOpt())).intValue()); + } } catch (ParseException e) { System.err.println(e.getMessage()); formatter.printHelp("NVMe storage tier", options); @@ -161,10 +152,10 @@ public class NvmfStorageConstants { NvmfStorageConstants.IP_ADDR = InetAddress.getByName(line.getOptionValue(bindIp.getOpt())); } if (line.hasOption(nqn.getOpt())) { - NvmfStorageConstants.NQN = line.getOptionValue(nqn.getOpt()); + NvmfStorageConstants.NQN = new NvmeQualifiedName(line.getOptionValue(nqn.getOpt())); } } NvmfStorageConstants.verify(); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java index 3fd958e..f8b0d8c 100644 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018, IBM Corporation + * Copyright (C) 2018, IBM Corporation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,10 +19,7 @@ package org.apache.crail.storage.nvmf; -import com.ibm.disni.nvmef.NvmeEndpoint; -import com.ibm.disni.nvmef.NvmeEndpointGroup; -import com.ibm.disni.nvmef.spdk.NvmeTransportType; - +import com.ibm.jnvmf.*; import org.apache.crail.conf.CrailConfiguration; import org.apache.crail.storage.StorageResource; import org.apache.crail.storage.StorageServer; @@ -31,19 +28,19 @@ import org.slf4j.Logger; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.URI; +import java.util.List; public class NvmfStorageServer implements StorageServer { private static final Logger LOG = CrailUtils.getLogger(); private boolean isAlive; private long alignedSize; - private long offset; + private long address; private boolean initialized = false; - private NvmeEndpoint endpoint; + private Controller controller; public NvmfStorageServer() {} - + public void init(CrailConfiguration crailConfiguration, String[] args) throws Exception { if (initialized) { throw new IOException("NvmfStorageTier already initialized"); @@ -51,23 +48,40 @@ public class NvmfStorageServer implements StorageServer { initialized = true; NvmfStorageConstants.parseCmdLine(crailConfiguration, args); - NvmeEndpointGroup group = new NvmeEndpointGroup(new NvmeTransportType[]{NvmeTransportType.RDMA}, NvmfStorageConstants.SERVER_MEMPOOL); - endpoint = group.createEndpoint(); - - URI uri = new URI("nvmef://" + NvmfStorageConstants.IP_ADDR.getHostAddress() + ":" + NvmfStorageConstants.PORT + - "/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=" + NvmfStorageConstants.NQN); - endpoint.connect(uri); - - long namespaceSize = endpoint.getNamespaceSize(); + Nvme nvme = new Nvme(); + NvmfTransportId transportId = new NvmfTransportId( + new InetSocketAddress(NvmfStorageConstants.IP_ADDR, NvmfStorageConstants.PORT), + NvmfStorageConstants.NQN); + controller = nvme.connect(transportId); + controller.getControllerConfiguration().setEnable(true); + controller.syncConfiguration(); + controller.waitUntilReady(); + + List<Namespace> namespaces = controller.getActiveNamespaces(); + Namespace namespace = null; + for (Namespace n : namespaces) { + if (n.getIdentifier().equals(NvmfStorageConstants.NAMESPACE)) { + namespace = n; + break; + } + } + if (namespace == null) { + throw new IllegalArgumentException("No namespace with id " + NvmfStorageConstants.NAMESPACE + + " at controller " + transportId.toString()); + } + IdentifyNamespaceData namespaceData = namespace.getIdentifyNamespaceData(); + LBAFormat lbaFormat = namespaceData.getFormattedLBASize(); + int dataSize = lbaFormat.getLBADataSize().toInt(); + long namespaceSize = dataSize * namespaceData.getNamespaceCapacity(); alignedSize = namespaceSize - (namespaceSize % NvmfStorageConstants.ALLOCATION_SIZE); - offset = 0; + address = 0; isAlive = true; - } + } @Override public void printConf(Logger log) { - NvmfStorageConstants.printConf(log); + NvmfStorageConstants.printConf(log); } public void run() { @@ -75,7 +89,7 @@ public class NvmfStorageServer implements StorageServer { while (isAlive) { try { Thread.sleep(1000 /* ms */); - endpoint.keepAlive(); + controller.keepAlive(); } catch (Exception e) { e.printStackTrace(); isAlive = false; @@ -86,15 +100,15 @@ public class NvmfStorageServer implements StorageServer { @Override public StorageResource allocateResource() throws Exception { StorageResource resource = null; - + if (alignedSize > 0){ LOG.info("new block, length " + NvmfStorageConstants.ALLOCATION_SIZE); - LOG.debug("block stag 0, offset " + offset + ", length " + NvmfStorageConstants.ALLOCATION_SIZE); + LOG.debug("block stag 0, address " + address + ", length " + NvmfStorageConstants.ALLOCATION_SIZE); alignedSize -= NvmfStorageConstants.ALLOCATION_SIZE; - resource = StorageResource.createResource(offset, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0); - offset += NvmfStorageConstants.ALLOCATION_SIZE; + resource = StorageResource.createResource(address, (int)NvmfStorageConstants.ALLOCATION_SIZE, 0); + address += NvmfStorageConstants.ALLOCATION_SIZE; } - + return resource; } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java index 1e37b44..749a331 100644 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageTier.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018, IBM Corporation + * Copyright (C) 2018, IBM Corporation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -26,9 +26,9 @@ import org.slf4j.Logger; public class NvmfStorageTier extends NvmfStorageClient implements StorageTier { private static final Logger LOG = CrailUtils.getLogger(); - + public StorageServer launchServer() throws Exception { - LOG.info("initalizing NVMf datanode"); + LOG.info("initalizing NVMf storage tier"); NvmfStorageServer storageServer = new NvmfStorageServer(); return storageServer; } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java new file mode 100644 index 0000000..0a6c9b4 --- /dev/null +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfFuture.java @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2018, IBM Corporation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crail.storage.nvmf.client; + +import com.ibm.jnvmf.*; +import org.apache.crail.storage.StorageFuture; +import org.apache.crail.storage.StorageResult; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class NvmfFuture<Command extends NvmIOCommand<? extends NvmIOCommandCapsule>> implements StorageFuture, OperationCallback { + private final NvmfStorageEndpoint endpoint; + private final Command command; + private final Queue<Command> operations; + private boolean done; + private RdmaException exception; + private final StorageResult storageResult; + private final Response<NvmResponseCapsule> response; + private int completed; + + NvmfFuture(NvmfStorageEndpoint endpoint, Command command, Response<NvmResponseCapsule> response, + Queue<Command> operations, int length) { + this.endpoint = endpoint; + this.command = command; + this.operations = operations; + this.done = false; + this.storageResult = () -> length; + this.response = response; + this.completed = 0; + } + + @Override + public boolean isSynchronous() { + return false; + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + if (!done) { + try { + endpoint.poll(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return done; + } + + private final void checkStatus() throws ExecutionException { + if (exception != null) { + throw new ExecutionException(exception); + } + NvmCompletionQueueEntry cqe = response.getResponseCapsule().getCompletionQueueEntry(); + StatusCode.Value statusCode = cqe.getStatusCode(); + if (statusCode != null) { + if (!statusCode.equals(GenericStatusCode.getInstance().SUCCESS)) { + throw new ExecutionException(new UnsuccessfulComandException(cqe)); + } + } + } + + @Override + public StorageResult get() throws InterruptedException, ExecutionException { + try { + return get(2, TimeUnit.MINUTES); + } catch (TimeoutException e) { + throw new ExecutionException(e); + } + } + + @Override + public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + if (!done) { + long start = System.nanoTime(); + long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit); + boolean waitTimeOut; + do { + try { + endpoint.poll(); + } catch (IOException e) { + throw new ExecutionException(e); + } + waitTimeOut = System.nanoTime() > end; + } while (!done && !waitTimeOut); + if (!done && waitTimeOut) { + throw new TimeoutException("poll wait time out!"); + } + } + checkStatus(); + return storageResult; + } + + @Override + public void onStart() { + + } + + @Override + public void onComplete() { + assert !done; + assert completed < 2; + if (++completed == 2) { + /* we need to complete command and response */ + operations.add(command); + this.done = true; + endpoint.putOperation(); + } + } + + @Override + public void onFailure(RdmaException e) { + assert !done; + this.operations.add(command); + this.exception = e; + this.done = true; + endpoint.putOperation(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java new file mode 100644 index 0000000..0a364e8 --- /dev/null +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfRegisteredBufferCache.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2018, IBM Corporation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crail.storage.nvmf.client; + +import com.ibm.jnvmf.Freeable; +import com.ibm.jnvmf.KeyedNativeBuffer; +import com.ibm.jnvmf.NativeByteBuffer; +import com.ibm.jnvmf.QueuePair; +import org.apache.crail.CrailBuffer; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class NvmfRegisteredBufferCache implements Freeable { + private final QueuePair queuePair; + private final Map<CrailBuffer, KeyedNativeBuffer> bufferMap; + private final Map<Long, KeyedNativeBuffer> regionMap; + private boolean valid; + + public NvmfRegisteredBufferCache(QueuePair queuePair) { + this.queuePair = queuePair; + this.bufferMap = new ConcurrentHashMap<>(); + this.regionMap = new ConcurrentHashMap<>(); + this.valid = true; + } + + static class Buffer extends NativeByteBuffer implements KeyedNativeBuffer { + private final KeyedNativeBuffer registeredRegionBuffer; + + Buffer(CrailBuffer buffer, KeyedNativeBuffer registeredRegionBuffer) { + super(buffer.getByteBuffer()); + this.registeredRegionBuffer = registeredRegionBuffer; + } + + @Override + public int getRemoteKey() { + return registeredRegionBuffer.getRemoteKey(); + } + + @Override + public int getLocalKey() { + return registeredRegionBuffer.getLocalKey(); + } + } + + KeyedNativeBuffer get(CrailBuffer buffer) throws IOException { + KeyedNativeBuffer keyedNativeBuffer = bufferMap.get(buffer); + if (keyedNativeBuffer == null) { + CrailBuffer regionBuffer = buffer.getRegion(); + keyedNativeBuffer = regionMap.get(regionBuffer.address()); + if (keyedNativeBuffer == null) { + /* region has not been registered yet */ + keyedNativeBuffer = queuePair.registerMemory(regionBuffer.getByteBuffer()); + KeyedNativeBuffer prevKeyedNativeBuffer = + regionMap.putIfAbsent(keyedNativeBuffer.getAddress(), keyedNativeBuffer); + if (prevKeyedNativeBuffer != null) { + /* someone registered the same region in parallel */ + keyedNativeBuffer.free(); + keyedNativeBuffer = prevKeyedNativeBuffer; + } + } + keyedNativeBuffer = new Buffer(buffer, keyedNativeBuffer); + KeyedNativeBuffer prevKeyedNativeBuffer = + bufferMap.putIfAbsent(buffer, keyedNativeBuffer); + if (prevKeyedNativeBuffer != null) { + /* someone added the same buffer parallel */ + keyedNativeBuffer.free(); + keyedNativeBuffer = prevKeyedNativeBuffer; + } + } + return keyedNativeBuffer; + } + + + @Override + public void free() throws IOException { + for (KeyedNativeBuffer buffer : bufferMap.values()) { + buffer.free(); + } + valid = false; + } + + @Override + public boolean isValid() { + return valid; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java new file mode 100644 index 0000000..b4f4dc3 --- /dev/null +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCache.java @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2018, IBM Corporation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crail.storage.nvmf.client; + +import org.apache.crail.CrailBuffer; +import org.apache.crail.CrailBufferCache; +import org.apache.crail.storage.StorageFuture; + +import java.util.Iterator; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class NvmfStagingBufferCache { + private final Map<Long, BufferCacheEntry> remoteAddressMap; + private final Queue<CrailBuffer> freeBuffers; + private int buffersLeft; + private final int lbaDataSize; + private final CrailBufferCache bufferCache; + + private final CrailBufferCache getBufferCache() { + return bufferCache; + } + + NvmfStagingBufferCache(CrailBufferCache bufferCache, int maxEntries, int lbaDataSize) { + if (maxEntries <= 0) { + throw new IllegalArgumentException("maximum entries <= 0"); + } + if (lbaDataSize <= 0) { + throw new IllegalArgumentException("LBA data size <= 0"); + } + this.remoteAddressMap = new ConcurrentHashMap<>(maxEntries); + this.freeBuffers = new ArrayBlockingQueue<>(maxEntries); + this.buffersLeft = maxEntries; + this.lbaDataSize = lbaDataSize; + this.bufferCache = bufferCache; + } + + synchronized void allocateFreeBuffers() throws Exception { + if (!freeBuffers.isEmpty()) { + return; + } + if (buffersLeft == 0) { + /* TODO: make sure this happens rarely */ + Iterator<BufferCacheEntry> iterator = remoteAddressMap.values().iterator(); + while (iterator.hasNext()) { + BufferCacheEntry currentEntry = iterator.next(); + if (currentEntry.tryFree()) { + iterator.remove(); + freeBuffers.add(currentEntry.getBuffer()); + return; + } + } + throw new OutOfMemoryError(); + } + + CrailBuffer buffer = getBufferCache().allocateBuffer(); + if (buffer == null) { + throw new OutOfMemoryError(); + } + if (buffer.capacity() < lbaDataSize) { + throw new IllegalArgumentException("Slice size smaller LBA data size"); + } + while (buffer.remaining() >= lbaDataSize && buffersLeft > 0) { + buffer.limit(buffer.position() + lbaDataSize); + freeBuffers.add(buffer.slice()); + buffer.position(buffer.limit()); + buffersLeft--; + } + } + + static class BufferCacheEntry { + private final CrailBuffer buffer; + private final AtomicInteger pending; + private StorageFuture future; + + BufferCacheEntry(CrailBuffer buffer) { + this.buffer = buffer; + this.pending = new AtomicInteger(1); + } + + public StorageFuture getFuture() { + return future; + } + + public void setFuture(StorageFuture future) { + this.future = future; + } + + void put() { + pending.decrementAndGet(); + } + + boolean tryGet() { + int prevPending; + do { + prevPending = pending.get(); + if (prevPending < 0) { + return false; + } + } while (!pending.compareAndSet(prevPending, prevPending + 1)); + return true; + } + + boolean tryFree() { + return pending.compareAndSet(0, -1); + } + + CrailBuffer getBuffer() { + return buffer; + } + + + } + + BufferCacheEntry get(long alignedRemoteAddress) throws Exception { + CrailBuffer buffer; + do { + buffer = freeBuffers.poll(); + if (buffer == null) { + allocateFreeBuffers(); + } + } while (buffer == null); + + BufferCacheEntry entry = new BufferCacheEntry(buffer); + BufferCacheEntry prevEntry = remoteAddressMap.putIfAbsent(alignedRemoteAddress, entry); + if (prevEntry != null) { + throw new IllegalStateException(); + } + return entry; + } + + BufferCacheEntry getExisting(long alignedRemoteAddress) { + BufferCacheEntry entry = remoteAddressMap.get(alignedRemoteAddress); + if (entry != null && !entry.tryGet()) { + entry = null; + } + return entry; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java index b4b1054..e1430af 100644 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018, IBM Corporation + * Copyright (C) 2018, IBM Corporation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,208 +19,223 @@ package org.apache.crail.storage.nvmf.client; -import com.ibm.disni.nvmef.NvmeCommand; -import com.ibm.disni.nvmef.NvmeEndpoint; -import com.ibm.disni.nvmef.NvmeEndpointGroup; -import com.ibm.disni.nvmef.spdk.IOCompletion; - +import com.ibm.jnvmf.*; import org.apache.crail.CrailBuffer; +import org.apache.crail.CrailBufferCache; +import org.apache.crail.CrailStatistics; import org.apache.crail.conf.CrailConstants; -import org.apache.crail.memory.BufferCache; import org.apache.crail.metadata.BlockInfo; +import org.apache.crail.metadata.DataNodeInfo; import org.apache.crail.storage.StorageEndpoint; import org.apache.crail.storage.StorageFuture; -import org.apache.crail.storage.nvmf.NvmfBufferCache; import org.apache.crail.storage.nvmf.NvmfStorageConstants; import org.apache.crail.utils.CrailUtils; import org.slf4j.Logger; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.*; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; public class NvmfStorageEndpoint implements StorageEndpoint { private static final Logger LOG = CrailUtils.getLogger(); - private final InetSocketAddress inetSocketAddress; - private final NvmeEndpoint endpoint; - private final int sectorSize; - private final BufferCache cache; - private final BlockingQueue<NvmeCommand> freeCommands; - private final NvmeCommand[] commands; - private final NvmfStorageFuture[] futures; - private final ThreadLocal<long[]> completed; - private final int ioQeueueSize; - - public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress inetSocketAddress) throws IOException { - this.inetSocketAddress = inetSocketAddress; - endpoint = group.createEndpoint(); + private final Controller controller; + private final IOQueuePair queuePair; + private final int lbaDataSize; + private final long namespaceCapacity; + private final NvmfRegisteredBufferCache registeredBufferCache; + private final NvmfStagingBufferCache stagingBufferCache; + private final CrailStatistics statistics; + + private final Queue<NvmWriteCommand> writeCommands; + private final Queue<NvmReadCommand> readCommands; + + private final AtomicInteger outstandingOperations; + + public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, CrailStatistics statistics, + CrailBufferCache bufferCache) throws IOException { + InetSocketAddress inetSocketAddress = new InetSocketAddress( + InetAddress.getByAddress(info.getIpAddress()), info.getPort()); + // XXX FIXME: nsid from datanodeinfo + NvmfTransportId transportId = new NvmfTransportId(inetSocketAddress, + new NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort())); + LOG.info("Connecting to NVMf target at " + transportId.toString()); + controller = nvme.connect(transportId); + controller.getControllerConfiguration().setEnable(true); + controller.syncConfiguration(); try { - URI url = new URI("nvmef://" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + - "/0/" + NvmfStorageConstants.NAMESPACE + "?subsystem=nqn.2016-06.io.spdk:cnode1"); - LOG.info("Connecting to " + url.toString()); - endpoint.connect(url); - } catch (URISyntaxException e) { - //FIXME - e.printStackTrace(); + controller.waitUntilReady(); + } catch (TimeoutException e) { + throw new IOException(e); } - sectorSize = endpoint.getSectorSize(); - cache = new NvmfBufferCache(); - ioQeueueSize = endpoint.getIOQueueSize(); - freeCommands = new ArrayBlockingQueue<NvmeCommand>(ioQeueueSize); - commands = new NvmeCommand[ioQeueueSize]; - for (int i = 0; i < ioQeueueSize; i++) { - NvmeCommand command = endpoint.newCommand(); - command.setId(i); - commands[i] = command; - freeCommands.add(command); + IdentifyControllerData identifyControllerData = controller.getIdentifyControllerData(); + if (CrailConstants.SLICE_SIZE > identifyControllerData.getMaximumDataTransferSize().toInt()) { + throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size (" + + identifyControllerData.getMaximumDataTransferSize() + ")"); } - futures = new NvmfStorageFuture[ioQeueueSize]; - completed = new ThreadLocal<long[]>() { - public long[] initialValue() { - return new long[ioQeueueSize]; + List<Namespace> namespaces = controller.getActiveNamespaces(); + //TODO: poll nsid in datanodeinfo + NamespaceIdentifier namespaceIdentifier = new NamespaceIdentifier(1); + Namespace namespace = null; + for (Namespace n : namespaces) { + if (n.getIdentifier().equals(namespaceIdentifier)) { + namespace = n; + break; } - }; + } + if (namespace == null) { + throw new IllegalArgumentException("No namespace with id " + namespaceIdentifier + + " at controller " + transportId.toString()); + } + IdentifyNamespaceData identifyNamespaceData = namespace.getIdentifyNamespaceData(); + lbaDataSize = identifyNamespaceData.getFormattedLBASize().getLBADataSize().toInt(); + if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) { + throw new IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + + " is not a multiple of LBA data size (" + lbaDataSize + ")"); + } + namespaceCapacity = identifyNamespaceData.getNamespaceCapacity() * lbaDataSize; + this.queuePair = controller.createIOQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0, + SubmissionQueueEntry.SIZE); + + this.writeCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE); + this.readCommands = new ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE); + for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) { + NvmWriteCommand writeCommand = new NvmWriteCommand(queuePair); + writeCommand.setSendInline(true); + writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier); + writeCommands.add(writeCommand); + NvmReadCommand readCommand = new NvmReadCommand(queuePair); + readCommand.setSendInline(true); + readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier); + readCommands.add(readCommand); + } + this.registeredBufferCache = new NvmfRegisteredBufferCache(queuePair); + this.outstandingOperations = new AtomicInteger(0); + this.stagingBufferCache = new NvmfStagingBufferCache(bufferCache, + NvmfStorageConstants.STAGING_CACHE_SIZE, getLBADataSize()); + this.statistics = statistics; } - public int getSectorSize() { - return sectorSize; + public int getLBADataSize() { + return lbaDataSize; + } + + public long getNamespaceCapacity() { + return namespaceCapacity; } enum Operation { WRITE, - READ; + READ } - public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) - throws IOException, InterruptedException { - int length = buffer.remaining(); - if (length > CrailConstants.BLOCK_SIZE){ - throw new IOException("write size too large " + length); - } - if (length <= 0){ - throw new IOException("write size too small, len " + length); - } - if (buffer.position() < 0){ - throw new IOException("local offset too small " + buffer.position()); - } - if (remoteOffset < 0){ - throw new IOException("remote offset too small " + remoteOffset); - } + void putOperation() { + outstandingOperations.decrementAndGet(); + } - if (remoteMr.getAddr() + remoteOffset + length > endpoint.getNamespaceSize()){ - long tmpAddr = remoteMr.getAddr() + remoteOffset + length; - throw new IOException("remote fileOffset + remoteOffset + len = " + tmpAddr + " - size = " + - endpoint.getNamespaceSize()); + private boolean tryGetOperation() { + int outstandingOperationsOld = outstandingOperations.get(); + if (outstandingOperationsOld != NvmfStorageConstants.QUEUE_SIZE) { + return outstandingOperations.compareAndSet(outstandingOperationsOld, outstandingOperationsOld + 1); } + return false; + } -// LOG.info("op = " + op.name() + -// ", position = " + buffer.position() + -// ", localOffset = " + buffer.position() + -// ", remoteOffset = " + remoteOffset + -// ", remoteAddr = " + remoteMr.getAddr() + -// ", length = " + length); - - NvmeCommand command = freeCommands.poll(); - while (command == null) { - poll(); - command = freeCommands.poll(); - } + private static int divCeil(int a, int b) { + return (a + b - 1) / b; + } - boolean aligned = NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0 - && NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0; - long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, remoteOffset, sectorSize); - StorageFuture future = null; - if (aligned) { -// LOG.info("aligned"); - command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba); - switch(op) { - case READ: - command.read(); - break; - case WRITE: - command.write(); - break; - } - future = futures[(int)command.getId()] = new NvmfStorageFuture(this, length); - command.execute(); - } else { -// LOG.info("unaligned"); - long alignedLength = NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length); + private int getNumLogicalBlocks(CrailBuffer buffer) { + return divCeil(buffer.remaining(), getLBADataSize()); + } - CrailBuffer stagingBuffer = cache.allocateBuffer(); - stagingBuffer.limit((int)alignedLength); + StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException { + assert blockInfo.getAddr() + remoteOffset + buffer.remaining() <= getNamespaceCapacity(); + assert remoteOffset >= 0; + assert buffer.remaining() <= CrailConstants.BLOCK_SIZE; + + long startingAddress = blockInfo.getAddr() + remoteOffset; + if (startingAddress % getLBADataSize() != 0 || + ((startingAddress + buffer.remaining()) % getLBADataSize() != 0 && op == Operation.WRITE)) { + if (op == Operation.READ) { + throw new IOException("Unaligned read access is not supported. Address (" + startingAddress + + ") needs to be multiple of LBA data size " + getLBADataSize()); + } try { - switch(op) { - case READ: { - NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, (int)alignedLength); - command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute(); - future = new NvmfStorageUnalignedReadFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer); - break; - } - case WRITE: { - if (NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0) { - // Do not read if the offset is aligned to sector size - int sizeToWrite = length; - stagingBuffer.put(buffer.getByteBuffer()); - stagingBuffer.position(0); - command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).write().execute(); - future = futures[(int)command.getId()] = new NvmfStorageUnalignedWriteFuture(this, sizeToWrite, stagingBuffer); - } else { - // RMW but append only file system allows only reading last sector - // and dir entries are sector aligned - stagingBuffer.limit(sectorSize); - NvmfStorageFuture f = futures[(int)command.getId()] = new NvmfStorageFuture(this, sectorSize); - command.setBuffer(stagingBuffer.getByteBuffer()).setLinearBlockAddress(lba).read().execute(); - future = new NvmfStorageUnalignedRMWFuture(f, this, buffer, remoteMr, remoteOffset, stagingBuffer); - } - break; - } - } - } catch (NoSuchFieldException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { + return new NvmfUnalignedWriteFuture(this, buffer, blockInfo, remoteOffset); + } catch (Exception e) { throw new IOException(e); } } + if (!tryGetOperation()) { + do { + poll(); + } while (!tryGetOperation()); + } + + NvmIOCommand<? extends NvmIOCommandCapsule> command; + NvmfFuture<?> future; + Response<NvmResponseCapsule> response; + if (op == Operation.READ) { + NvmReadCommand readCommand = readCommands.remove(); + response = readCommand.newResponse(); + future = new NvmfFuture<>(this, readCommand, response, readCommands, buffer.remaining()); + command = readCommand; + } else { + NvmWriteCommand writeCommand = writeCommands.remove(); + response = writeCommand.newResponse(); + future = new NvmfFuture<>(this, writeCommand, response, writeCommands, buffer.remaining()); + command = writeCommand; + } + command.setCallback(future); + response.setCallback(future); + + NvmIOCommandSQE sqe = command.getCommandCapsule().getSubmissionQueueEntry(); + long startingLBA = startingAddress / getLBADataSize(); + sqe.setStartingLBA(startingLBA); + /* TODO: on read this potentially overwrites data beyond the set limit */ + short numLogicalBlocks = (short)(getNumLogicalBlocks(buffer) - 1); + sqe.setNumberOfLogicalBlocks(numLogicalBlocks); + KeyedNativeBuffer registeredBuffer = registeredBufferCache.get(buffer); + registeredBuffer.position(buffer.position()); + registeredBuffer.limit(registeredBuffer.position() + (numLogicalBlocks + 1) * getLBADataSize()); + command.getCommandCapsule().setSGLDescriptor(registeredBuffer); + + command.execute(response); + return future; } - public StorageFuture write(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) - throws IOException, InterruptedException { + public StorageFuture write(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException { return Op(Operation.WRITE, buffer, blockInfo, remoteOffset); } - public StorageFuture read(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) - throws IOException, InterruptedException { + public StorageFuture read(CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws InterruptedException, IOException { return Op(Operation.READ, buffer, blockInfo, remoteOffset); } void poll() throws IOException { - long[] ca = completed.get(); - int numberCompletions = endpoint.processCompletions(ca); - for (int i = 0; i < numberCompletions; i++) { - int idx = (int)ca[i]; - NvmeCommand command = commands[idx]; - IOCompletion completion = command.getCompletion(); - completion.done(); - futures[idx].signal(completion.getStatusCodeType(), completion.getStatusCode()); - freeCommands.add(command); - } - } - - void putBuffer(CrailBuffer buffer) throws IOException { - cache.freeBuffer(buffer); + queuePair.poll(); } public void close() throws IOException, InterruptedException { - endpoint.close(); + registeredBufferCache.free(); + controller.free(); + } public boolean isLocal() { return false; } + + NvmfStagingBufferCache getStagingBufferCache() { + return stagingBufferCache; + } + } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java deleted file mode 100644 index 9dbab36..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageFuture.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf.client; - -import com.ibm.disni.nvmef.spdk.NvmeGenericCommandStatusCode; -import com.ibm.disni.nvmef.spdk.NvmeStatusCodeType; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.crail.storage.StorageFuture; -import org.apache.crail.storage.StorageResult; -import org.apache.crail.storage.nvmf.NvmfStorageConstants; - -public class NvmfStorageFuture implements StorageFuture, StorageResult { - - protected final NvmfStorageEndpoint endpoint; - private final int len; - private Exception exception; - private volatile boolean done; - - public NvmfStorageFuture(NvmfStorageEndpoint endpoint, int len) { - this.endpoint = endpoint; - this.len = len; - } - - public int getLen() { - return len; - } - - public boolean cancel(boolean b) { - return false; - } - - public boolean isCancelled() { - return false; - } - - void signal(NvmeStatusCodeType statusCodeType, int statusCode) { - if (statusCodeType != NvmeStatusCodeType.GENERIC && - statusCode != NvmeGenericCommandStatusCode.SUCCESS.getNumVal()) { - exception = new ExecutionException("Error: " + statusCodeType.name() + " - " + statusCode) {}; - } - done = true; - } - - public boolean isDone() { - if (!done) { - try { - endpoint.poll(); - } catch (IOException e) { - exception = e; - } - } - return done; - } - - public StorageResult get() throws InterruptedException, ExecutionException { - try { - return get(NvmfStorageConstants.TIME_OUT, NvmfStorageConstants.TIME_UNIT); - } catch (TimeoutException e) { - throw new ExecutionException(e); - } - } - - public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - if (exception != null) { - throw new ExecutionException(exception); - } - if (!done) { - long start = System.nanoTime(); - long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit); - boolean waitTimeOut; - do { - try { - endpoint.poll(); - } catch (IOException e) { - throw new ExecutionException(e); - } - // we don't want to trigger timeout on first iteration - waitTimeOut = System.nanoTime() > end; - } while (!done && !waitTimeOut); - if (!done && waitTimeOut) { - throw new TimeoutException("get wait time out!"); - } - if (exception != null) { - throw new ExecutionException(exception); - } - } - return this; - } - - @Override - public boolean isSynchronous() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java deleted file mode 100644 index 93d24d0..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedFuture.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf.client; - -import sun.misc.Unsafe; - -import java.lang.reflect.Field; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.crail.CrailBuffer; -import org.apache.crail.metadata.BlockInfo; -import org.apache.crail.storage.StorageFuture; -import org.apache.crail.storage.StorageResult; -import org.apache.crail.storage.nvmf.NvmfStorageConstants; - -public abstract class NvmfStorageUnalignedFuture implements StorageFuture, StorageResult { - protected final NvmfStorageFuture initFuture; - protected final NvmfStorageEndpoint endpoint; - protected final CrailBuffer buffer; - protected final long localOffset; - protected final BlockInfo remoteMr; - protected final long remoteOffset; - protected final int len; - protected final CrailBuffer stagingBuffer; - protected boolean done; - protected Exception exception; - protected static Unsafe unsafe; - - public NvmfStorageUnalignedFuture(NvmfStorageFuture future, NvmfStorageEndpoint endpoint, CrailBuffer buffer, - BlockInfo remoteMr, long remoteOffset, CrailBuffer stagingBuffer) - throws NoSuchFieldException, IllegalAccessException { - this.initFuture = future; - this.endpoint = endpoint; - this.buffer = buffer; - this.localOffset = buffer.position(); - this.remoteMr = remoteMr; - this.remoteOffset = remoteOffset; - this.len = buffer.remaining(); - this.stagingBuffer = stagingBuffer; - initUnsafe(); - done = false; - } - - public boolean isDone() { - if (!done) { - try { - get(0, TimeUnit.NANOSECONDS); - } catch (InterruptedException e) { - exception = e; - } catch (ExecutionException e) { - exception = e; - } catch (TimeoutException e) { - // i.e. operation is not finished - } - } - return done; - } - - public int getLen() { - return len; - } - - public boolean cancel(boolean b) { - return false; - } - - public boolean isCancelled() { - return false; - } - - public StorageResult get() throws InterruptedException, ExecutionException { - try { - return get(NvmfStorageConstants.TIME_OUT, NvmfStorageConstants.TIME_UNIT); - } catch (TimeoutException e) { - throw new ExecutionException(e); - } - } - - private static void initUnsafe() throws NoSuchFieldException, IllegalAccessException { - if (unsafe == null) { - Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); - theUnsafe.setAccessible(true); - unsafe = (Unsafe) theUnsafe.get(null); - } - } - - @Override - public boolean isSynchronous() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java deleted file mode 100644 index b0a5ae9..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedRMWFuture.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf.client; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.crail.CrailBuffer; -import org.apache.crail.metadata.BlockInfo; -import org.apache.crail.storage.StorageResult; - -public class NvmfStorageUnalignedRMWFuture extends NvmfStorageUnalignedFuture { - - private boolean initDone; - private Future<StorageResult> writeFuture; - - public NvmfStorageUnalignedRMWFuture(NvmfStorageFuture future, NvmfStorageEndpoint endpoint, CrailBuffer buffer, - BlockInfo remoteMr, long remoteOffset, CrailBuffer stagingBuffer) - throws NoSuchFieldException, IllegalAccessException { - super(future, endpoint, buffer, remoteMr, remoteOffset, stagingBuffer); - initDone = false; - } - - public StorageResult get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - if (exception != null) { - throw new ExecutionException(exception); - } - if (!done) { - if (!initDone) { - initFuture.get(l, timeUnit); - long srcAddr = buffer.address() + localOffset; - long dstAddr = stagingBuffer.address() + NvmfStorageUtils.namespaceSectorOffset( - endpoint.getSectorSize(), remoteOffset); - unsafe.copyMemory(srcAddr, dstAddr, len); - - stagingBuffer.clear(); - int alignedLen = (int) NvmfStorageUtils.alignLength(endpoint.getSectorSize(), remoteOffset, len); - stagingBuffer.limit(alignedLen); - try { - writeFuture = endpoint.write(stagingBuffer, remoteMr, NvmfStorageUtils.alignOffset(endpoint.getSectorSize(), remoteOffset)); - } catch (IOException e) { - throw new ExecutionException(e); - } - initDone =true; - } - writeFuture.get(l, timeUnit); - try { - endpoint.putBuffer(stagingBuffer); - } catch (IOException e) { - throw new ExecutionException(e); - } - done = true; - } - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java deleted file mode 100644 index 61f460e..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedReadFuture.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf.client; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.crail.CrailBuffer; -import org.apache.crail.metadata.BlockInfo; -import org.apache.crail.storage.StorageResult; - -public class NvmfStorageUnalignedReadFuture extends NvmfStorageUnalignedFuture { - - public NvmfStorageUnalignedReadFuture(NvmfStorageFuture future, NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo remoteMr, - long remoteOffset, CrailBuffer stagingBuffer) - throws NoSuchFieldException, IllegalAccessException { - super(future, endpoint, buffer, remoteMr, remoteOffset, stagingBuffer); - } - - public StorageResult get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - if (exception != null) { - throw new ExecutionException(exception); - } - if (!done) { - initFuture.get(l, timeUnit); - long srcAddr = stagingBuffer.address() + - NvmfStorageUtils.namespaceSectorOffset(endpoint.getSectorSize(), remoteOffset); - long dstAddr = buffer.address() + localOffset; - unsafe.copyMemory(srcAddr, dstAddr, len); - done = true; - try { - endpoint.putBuffer(stagingBuffer); - } catch (IOException e) { - throw new ExecutionException(e); - } - } - return this; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java deleted file mode 100644 index a842033..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUnalignedWriteFuture.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf.client; - -import com.ibm.disni.nvmef.spdk.NvmeStatusCodeType; - -import java.io.IOException; - -import org.apache.crail.CrailBuffer; - -/** - * Created by jpf on 23.05.17. - */ -public class NvmfStorageUnalignedWriteFuture extends NvmfStorageFuture { - - private CrailBuffer stagingBuffer; - - public NvmfStorageUnalignedWriteFuture(NvmfStorageEndpoint endpoint, int len, CrailBuffer stagingBuffer) { - super(endpoint, len); - this.stagingBuffer = stagingBuffer; - } - - @Override - void signal(NvmeStatusCodeType statusCodeType, int statusCode) { - try { - endpoint.putBuffer(stagingBuffer); - } catch (IOException e) { - e.printStackTrace(); - System.exit(-1); - } - super.signal(statusCodeType, statusCode); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java deleted file mode 100644 index 562e495..0000000 --- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) 2015-2018, IBM Corporation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.crail.storage.nvmf.client; - -import org.apache.crail.metadata.BlockInfo; - -/** - * Created by jpf on 14.02.17. - */ -public class NvmfStorageUtils { - - public static long linearBlockAddress(BlockInfo remoteMr, long remoteOffset, int sectorSize) { - return (remoteMr.getAddr() + remoteOffset) / (long)sectorSize; - } - - public static long namespaceSectorOffset(int sectorSize, long fileOffset) { - return fileOffset % (long)sectorSize; - } - - public static long alignLength(int sectorSize, long remoteOffset, long len) { - long alignedSize = len + namespaceSectorOffset(sectorSize, remoteOffset); - if (namespaceSectorOffset(sectorSize, alignedSize) != 0) { - alignedSize += (long)sectorSize - namespaceSectorOffset(sectorSize, alignedSize); - } - return alignedSize; - } - - public static long alignOffset(int sectorSize, long fileOffset) { - return fileOffset - namespaceSectorOffset(sectorSize, fileOffset); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java new file mode 100644 index 0000000..08eab5b --- /dev/null +++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfUnalignedWriteFuture.java @@ -0,0 +1,183 @@ +/* + * Copyright (C) 2018, IBM Corporation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crail.storage.nvmf.client; + +import org.apache.crail.CrailBuffer; +import org.apache.crail.metadata.BlockInfo; +import org.apache.crail.storage.StorageFuture; +import org.apache.crail.storage.StorageResult; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class NvmfUnalignedWriteFuture implements StorageFuture { + private final NvmfStorageEndpoint endpoint; + private StorageFuture beginFuture; + private StorageFuture middleFuture; + private StorageFuture endFuture; + private final int written; + private NvmfStagingBufferCache.BufferCacheEntry beginBuffer; + private NvmfStagingBufferCache.BufferCacheEntry endBuffer; + + private final boolean isSectorAligned(long address) { + return address % endpoint.getLBADataSize() == 0; + } + + private final long floorToSectorSize(long address) { + return address - (address % endpoint.getLBADataSize()); + } + + private final int leftInSector(long address) { + return endpoint.getLBADataSize() - offsetInSector(address); + } + + private final int offsetInSector(long address) { + return (int)(address % endpoint.getLBADataSize()); + } + + NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception { + this.endpoint = endpoint; + this.written = buffer.remaining(); + /* assume blockInfo.getAddr() is sector aligned */ + assert isSectorAligned(blockInfo.getAddr()); + + long nextRemoteOffset = remoteOffset; + /* beginning */ + if (!isSectorAligned(remoteOffset)) { + int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining()); + nextRemoteOffset = remoteOffset + copySize; + int oldLimit = buffer.limit(); + buffer.limit(buffer.position() + copySize); + long alignedRemoteOffset = floorToSectorSize(remoteOffset); + long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset; + beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress); + if (beginBuffer == null) { + /* we had to delete the old buffer because we ran out of space. This should happen rarely. */ + beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress); + endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get(); + } else { + /* Wait for previous end operation to finish */ + beginBuffer.getFuture().get(); + } + CrailBuffer stagingBuffer = beginBuffer.getBuffer(); + stagingBuffer.position(offsetInSector(remoteOffset)); + stagingBuffer.getByteBuffer().put(buffer.getByteBuffer()); + buffer.limit(oldLimit); + stagingBuffer.position(0); + beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset); + beginBuffer.setFuture(beginFuture); + stagingBuffer.position(offsetInSector(remoteOffset)); + } + + /* middle */ + if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) { + int oldLimit = buffer.limit(); + buffer.limit(buffer.position() + (int)floorToSectorSize(buffer.remaining())); + int toWrite = buffer.remaining(); + middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset); + nextRemoteOffset += toWrite; + buffer.position(buffer.limit()); + buffer.limit(oldLimit); + } + + /* end */ + if (buffer.remaining() > 0) { + endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset); + CrailBuffer stagingBuffer = endBuffer.getBuffer(); + stagingBuffer.position(0); + stagingBuffer.getByteBuffer().put(buffer.getByteBuffer()); + stagingBuffer.position(0); + endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset); + endBuffer.setFuture(endFuture); + } + } + + @Override + public boolean isSynchronous() { + return false; + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + private static boolean checkIfFutureIsDone(StorageFuture future) { + return (future != null && future.isDone()) || future == null; + } + + @Override + public boolean isDone() { + if (beginFuture != null && beginFuture.isDone()) { + if (beginBuffer != null) { + beginBuffer.put(); + beginBuffer = null; + } + } + if (endFuture != null && endFuture.isDone()) { + if (endBuffer != null) { + endBuffer.put(); + endBuffer = null; + } + } + return beginBuffer == null && checkIfFutureIsDone(middleFuture) && endBuffer == null; + } + + @Override + public StorageResult get() throws InterruptedException, ExecutionException { + try { + return get(2, TimeUnit.MINUTES); + } catch (TimeoutException e) { + throw new ExecutionException(e); + } + } + + @Override + public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + if (!isDone()) { + long start = System.nanoTime(); + long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit); + boolean waitTimeOut; + do { + waitTimeOut = System.nanoTime() > end; + } while (!isDone() && !waitTimeOut); + if (!isDone() && waitTimeOut) { + throw new TimeoutException("poll wait time out!"); + } + } + if (beginFuture != null) { + beginFuture.get(); + } + if (middleFuture != null) { + middleFuture.get(); + } + if (endFuture != null) { + endFuture.get(); + } + return () -> written; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/29be91d2/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java ---------------------------------------------------------------------- diff --git a/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java b/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java new file mode 100644 index 0000000..9d4d7bf --- /dev/null +++ b/storage-nvmf/src/test/java/org/apache/crail/storage/nvmf/client/NvmfStagingBufferCacheTest.java @@ -0,0 +1,73 @@ +package org.apache.crail.storage.nvmf.client; + +import org.apache.crail.CrailBuffer; +import org.apache.crail.CrailBufferCache; +import org.apache.crail.conf.CrailConfiguration; +import org.apache.crail.conf.CrailConstants; +import org.apache.crail.memory.MappedBufferCache; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; + +public class NvmfStagingBufferCacheTest { + + @BeforeClass + public static void init() throws IOException { + CrailConstants.updateConstants(new CrailConfiguration()); + } + + private static CrailBufferCache bufferCache; + static CrailBufferCache getBufferCache() throws IOException { + if (bufferCache == null) { + bufferCache = new MappedBufferCache(); + } + return bufferCache; + } + + + @Test(expected = IllegalArgumentException.class) + public void createBufferCache() throws IOException { + new NvmfStagingBufferCache(getBufferCache(), -1, 512); + new NvmfStagingBufferCache(getBufferCache(),0, 512); + new NvmfStagingBufferCache(getBufferCache(),1024, -1); + new NvmfStagingBufferCache(getBufferCache(),1024, 0); + } + + @Test(expected = OutOfMemoryError.class) + public void outOfMemory() throws Exception { + NvmfStagingBufferCache bufferCache = new NvmfStagingBufferCache(getBufferCache(),1, 512); + NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry = bufferCache.get(0); + NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry2 = bufferCache.get(1); + } + + @Test + public void bufferExists() throws Exception { + NvmfStagingBufferCache bufferCache = new NvmfStagingBufferCache(getBufferCache(),1, 512); + NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry = bufferCache.get(0); + NvmfStagingBufferCache.BufferCacheEntry existingBufferCacheEntry = bufferCache.getExisting(0); + assertEquals(bufferCacheEntry, existingBufferCacheEntry); + } + + @Test + public void recycleBuffers() throws Exception { + NvmfStagingBufferCache.BufferCacheEntry bufferCacheEntry[] = new NvmfStagingBufferCache.BufferCacheEntry[5]; + Set<CrailBuffer> buffers = new HashSet<>(); + NvmfStagingBufferCache bufferCache = new NvmfStagingBufferCache(getBufferCache(), bufferCacheEntry.length, 512); + for (int i = 0; i < bufferCacheEntry.length; i++) { + bufferCacheEntry[i] = bufferCache.get(i); + buffers.add(bufferCacheEntry[i].getBuffer()); + bufferCacheEntry[i].put(); + } + for (int i = 0; i < bufferCacheEntry.length; i++) { + bufferCacheEntry[i] = bufferCache.get(i + bufferCacheEntry.length); + assertTrue(buffers.remove(bufferCacheEntry[i].getBuffer())); + } + } +} \ No newline at end of file