johnyangk commented on a change in pull request #219: [NEMO-351] Empowering 
Nemo with fast I/O using Apache Crail
URL: https://github.com/apache/incubator-nemo/pull/219#discussion_r292262441
 
 

 ##########
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/CrailFileStore.java
 ##########
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data.stores;
+
+import org.apache.crail.*;
+import org.apache.crail.conf.CrailConfiguration;
+import org.apache.nemo.common.exception.BlockFetchException;
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.common.exception.BlockWriteException;
+import org.apache.nemo.runtime.executor.data.*;
+import org.apache.nemo.runtime.executor.data.block.Block;
+import org.apache.nemo.runtime.executor.data.block.CrailFileBlock;
+import org.apache.nemo.runtime.executor.data.metadata.CrailFileMetadata;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Stores blocks in CrailStore.
+ * Since the data is stored in CrailStore and globally accessed by multiple 
nodes,
+ * each read, or deletion for a file needs one instance of {@link 
CrailFileBlock}.
+ * When CrailFileBlock is created, it's metadata is maintained in memory until 
the block is committed.
+ * After the block is committed, the metadata is stored in and read from a 
CrailStore.
+ */
+@ThreadSafe
+public final class CrailFileStore extends AbstractBlockStore implements 
RemoteFileStore {
+  private final String fileDirectory;
+  private CrailConfiguration conf;
+  private CrailStore fs;
+
+  /**
+   * Constructor.
+   *
+   * @param volumeDirectory   the CrailStore directory where we contain the 
files.
+   * @param jobId             the job id.
+   * @param serializerManager the serializer manager.
+   * @throws Exception for any error occurred while trying to set Crail 
requirements.
+   */
+  @Inject
+  private CrailFileStore(@Parameter(JobConf.CrailVolumeDirectory.class) final 
String volumeDirectory,
+                         @Parameter(JobConf.JobId.class) final String jobId,
+                         final SerializerManager serializerManager) throws 
Exception {
+    super(serializerManager);
+    this.conf = CrailConfiguration.createConfigurationFromFile();
+    this.fs = CrailStore.newInstance(conf);
+    this.fileDirectory = volumeDirectory;
+  }
+
+  @Override
+  public Block createBlock(final String blockId) {
+    deleteBlock(blockId);
+    final Serializer serializer = getSerializerFromWorker(blockId);
+    final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
+    final String metaPath = DataUtil.blockIdToMetaFilePath(blockId, 
fileDirectory);
+    final CrailFileMetadata metadata = CrailFileMetadata.create(metaPath, fs);
+    return new CrailFileBlock<>(blockId, serializer, filePath, metadata, fs);
+  }
+
+  /**
+   * Writes a committed block to this store.
+   *
+   * @param block the block to write.
+   * @throws BlockWriteException if fail to write.
+   */
+
+  @Override
+  public void writeBlock(final Block block) throws BlockWriteException {
+    if (!(block instanceof CrailFileBlock)) {
+      throw new BlockWriteException(new Throwable(
+        this.toString() + " only accept " + CrailFileBlock.class.getName()));
+    } else if (!block.isCommitted()) {
+      throw new BlockWriteException(new Throwable("The block " + block.getId() 
+ "is not committed yet."));
+    }
+    // Do nothing. The block have to be written in CrailStore file during 
commit.
+  }
+
+  /**
+   * Reads a committed block from this store.
+   *
+   * @param blockId of the target partition.
+   * @return the target block (if it exists).
+   * @throws BlockFetchException for any error occurred while trying to fetch 
a block.
+   */
+
+  @Override
+
+  public Optional<Block> readBlock(final String blockId) throws 
BlockFetchException {
+    final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
+    try {
+      if (fs.lookup(filePath).get() == null) {
+        return Optional.empty();
+      } else {
+        try {
+          final CrailFileBlock block = getBlockFromFile(blockId);
+          return Optional.of(block);
+        } catch (final IOException e) {
+          throw new BlockFetchException(e);
+        } catch (Exception e) {
 
 Review comment:
   Please use a more specific Exception class.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to