HDFS-12773. RBF: Improve State Store FS implementation. Contributed by Inigo 
Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76be6cbf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76be6cbf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76be6cbf

Branch: refs/heads/HDFS-7240
Commit: 76be6cbf6c33f866794f27ca2560ca7c7b2fa0e7
Parents: 427fd02
Author: Yiqun Lin <yq...@apache.org>
Authored: Wed Mar 14 11:20:59 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Wed Mar 14 11:20:59 2018 +0800

----------------------------------------------------------------------
 .../federation/metrics/StateStoreMetrics.java   |   5 +
 .../driver/StateStoreRecordOperations.java      |  15 -
 .../driver/impl/StateStoreFileBaseImpl.java     | 433 ++++++++++---------
 .../store/driver/impl/StateStoreFileImpl.java   | 109 ++---
 .../driver/impl/StateStoreFileSystemImpl.java   | 128 +++---
 .../driver/impl/StateStoreZooKeeperImpl.java    |   6 -
 .../store/driver/TestStateStoreDriverBase.java  |   9 +
 .../store/driver/TestStateStoreFile.java        |  12 +
 .../store/driver/TestStateStoreFileBase.java    |  47 ++
 .../store/driver/TestStateStoreFileSystem.java  |  14 +-
 10 files changed, 428 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
index 40dcd40..09253a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
@@ -140,5 +140,10 @@ public final class StateStoreMetrics implements 
StateStoreMBean {
     writes.resetMinMax();
     removes.resetMinMax();
     failures.resetMinMax();
+
+    reads.lastStat().reset();
+    writes.lastStat().reset();
+    removes.lastStat().reset();
+    failures.lastStat().reset();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
index e76a733..443d46e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
@@ -49,21 +49,6 @@ public interface StateStoreRecordOperations {
   <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException;
 
   /**
-   * Get all records of the requested record class from the data store. To use
-   * the default implementations in this class, getAll must return new 
instances
-   * of the records on each call. It is recommended to override the default
-   * implementations for better performance.
-   *
-   * @param clazz Class of record to fetch.
-   * @param sub Sub path.
-   * @return List of all records that match the clazz and the sub path.
-   * @throws IOException
-   */
-  @Idempotent
-  <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
-      throws IOException;
-
-  /**
    * Get a single record from the store that matches the query.
    *
    * @param clazz Class of record to fetch.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
index a0cd878f..6638d1c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -18,28 +18,39 @@
 package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
 
 import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
-import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.apache.hadoop.util.Time.now;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.Query;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
- * {@link StateStoreDriver} implementation based on a local file.
+ * {@link StateStoreDriver} implementation based on files. In this approach, we
+ * use temporary files for the writes and renaming "atomically" to the final
+ * value. Instead of writing to the final location, it will go to a temporary
+ * one and then rename to the final destination.
  */
 public abstract class StateStoreFileBaseImpl
     extends StateStoreSerializableImpl {
@@ -47,75 +58,76 @@ public abstract class StateStoreFileBaseImpl
   private static final Logger LOG =
       LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
 
+  /** File extension for temporary files. */
+  private static final String TMP_MARK = ".tmp";
+  /** We remove temporary files older than 10 seconds. */
+  private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
+  /** File pattern for temporary records: file.XYZ.tmp. */
+  private static final Pattern OLD_TMP_RECORD_PATTERN =
+      Pattern.compile(".+\\.(\\d+)\\.tmp");
+
   /** If it is initialized. */
   private boolean initialized = false;
 
-  /** Name of the file containing the data. */
-  private static final String DATA_FILE_NAME = "records.data";
-
 
   /**
-   * Lock reading records.
+   * Get the reader of a record for the file system.
    *
-   * @param clazz Class of the record.
+   * @param path Path of the record to read.
+   * @return Reader for the record.
    */
-  protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> 
clazz);
+  protected abstract <T extends BaseRecord> BufferedReader getReader(
+      String path);
 
   /**
-   * Unlock reading records.
+   * Get the writer of a record for the file system.
    *
-   * @param clazz Class of the record.
+   * @param path Path of the record to write.
+   * @return Writer for the record.
    */
-  protected abstract <T extends BaseRecord> void unlockRecordRead(
-      Class<T> clazz);
+  protected abstract <T extends BaseRecord> BufferedWriter getWriter(
+      String path);
 
   /**
-   * Lock writing records.
+   * Check if a path exists.
    *
-   * @param clazz Class of the record.
+   * @param path Path to check.
+   * @return If the path exists.
    */
-  protected abstract <T extends BaseRecord> void lockRecordWrite(
-      Class<T> clazz);
+  protected abstract boolean exists(String path);
 
   /**
-   * Unlock writing records.
+   * Make a directory.
    *
-   * @param clazz Class of the record.
+   * @param path Path of the directory to create.
+   * @return If the directory was created.
    */
-  protected abstract <T extends BaseRecord> void unlockRecordWrite(
-      Class<T> clazz);
+  protected abstract boolean mkdir(String path);
 
   /**
-   * Get the reader for the file system.
+   * Rename a file. This should be atomic.
    *
-   * @param clazz Class of the record.
+   * @param src Source name.
+   * @param dst Destination name.
+   * @return If the rename was successful.
    */
-  protected abstract <T extends BaseRecord> BufferedReader getReader(
-      Class<T> clazz, String sub);
+  protected abstract boolean rename(String src, String dst);
 
   /**
-   * Get the writer for the file system.
+   * Remove a file.
    *
-   * @param clazz Class of the record.
+   * @param path Path for the file to remove
+   * @return If the file was removed.
    */
-  protected abstract <T extends BaseRecord> BufferedWriter getWriter(
-      Class<T> clazz, String sub);
+  protected abstract boolean remove(String path);
 
   /**
-   * Check if a path exists.
+   * Get the children for a path.
    *
    * @param path Path to check.
-   * @return If the path exists.
-   */
-  protected abstract boolean exists(String path);
-
-  /**
-   * Make a directory.
-   *
-   * @param path Path of the directory to create.
-   * @return If the directory was created.
+   * @return List of children.
    */
-  protected abstract boolean mkdir(String path);
+  protected abstract List<String> getChildren(String path);
 
   /**
    * Get root directory.
@@ -171,15 +183,6 @@ public abstract class StateStoreFileBaseImpl
           LOG.error("Cannot create data directory {}", dataDirPath);
           return false;
         }
-        String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
-        if (!exists(dataFilePath)) {
-          // Create empty file
-          List<T> emtpyList = new ArrayList<>();
-          if(!writeAll(emtpyList, recordClass)) {
-            LOG.error("Cannot create data file {}", dataFilePath);
-            return false;
-          }
-        }
       }
     } catch (Exception ex) {
       LOG.error("Cannot create data directory {}", dataDirPath, ex);
@@ -188,138 +191,110 @@ public abstract class StateStoreFileBaseImpl
     return true;
   }
 
-  /**
-   * Read all lines from a file and deserialize into the desired record type.
-   *
-   * @param reader Open handle for the file.
-   * @param clazz Record class to create.
-   * @param includeDates True if dateModified/dateCreated are serialized.
-   * @return List of records.
-   * @throws IOException
-   */
-  private <T extends BaseRecord> List<T> getAllFile(
-      BufferedReader reader, Class<T> clazz, boolean includeDates)
-          throws IOException {
-
-    List<T> ret = new ArrayList<T>();
-    String line;
-    while ((line = reader.readLine()) != null) {
-      if (!line.startsWith("#") && line.length() > 0) {
-        try {
-          T record = newRecord(line, clazz, includeDates);
-          ret.add(record);
-        } catch (Exception ex) {
-          LOG.error("Cannot parse line in data source file: {}", line, ex);
-        }
-      }
-    }
-    return ret;
-  }
-
   @Override
   public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
       throws IOException {
-    return get(clazz, (String)null);
-  }
-
-  @Override
-  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
-      throws IOException {
     verifyDriverReady();
-    BufferedReader reader = null;
-    lockRecordRead(clazz);
+    long start = monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+    List<T> ret = new ArrayList<>();
     try {
-      reader = getReader(clazz, sub);
-      List<T> data = getAllFile(reader, clazz, true);
-      return new QueryResult<T>(data, getTime());
-    } catch (Exception ex) {
-      LOG.error("Cannot fetch records {}", clazz.getSimpleName());
-      throw new IOException("Cannot read from data store " + ex.getMessage());
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close();
-        } catch (IOException e) {
-          LOG.error("Failed closing file", e);
+      String path = getPathForClass(clazz);
+      List<String> children = getChildren(path);
+      for (String child : children) {
+        String pathRecord = path + "/" + child;
+        if (child.endsWith(TMP_MARK)) {
+          LOG.debug("There is a temporary file {} in {}", child, path);
+          if (isOldTempRecord(child)) {
+            LOG.warn("Removing {} as it's an old temporary record", child);
+            remove(pathRecord);
+          }
+        } else {
+          T record = getRecord(pathRecord, clazz);
+          ret.add(record);
         }
       }
-      unlockRecordRead(clazz);
+    } catch (Exception e) {
+      if (metrics != null) {
+        metrics.addFailure(monotonicNow() - start);
+      }
+      String msg = "Cannot fetch records for " + clazz.getSimpleName();
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (metrics != null) {
+      metrics.addRead(monotonicNow() - start);
     }
+    return new QueryResult<T>(ret, getTime());
   }
 
   /**
-   * Overwrite the existing data with a new data set.
+   * Check if a record is temporary and old.
    *
-   * @param records List of records to write.
-   * @param writer BufferedWriter stream to write to.
-   * @return If the records were succesfully written.
+   * @param pathRecord Path for the record to check.
+   * @return If the record is temporary and old.
    */
-  private <T extends BaseRecord> boolean writeAllFile(
-      Collection<T> records, BufferedWriter writer) {
-
-    try {
-      for (BaseRecord record : records) {
-        try {
-          String data = serializeString(record);
-          writer.write(data);
-          writer.newLine();
-        } catch (IllegalArgumentException ex) {
-          LOG.error("Cannot write record {} to file", record, ex);
-        }
-      }
-      writer.flush();
-      return true;
-    } catch (IOException e) {
-      LOG.error("Cannot commit records to file", e);
+  @VisibleForTesting
+  public static boolean isOldTempRecord(final String pathRecord) {
+    if (!pathRecord.endsWith(TMP_MARK)) {
       return false;
     }
+    // Extract temporary record creation time
+    Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
+    if (m.find()) {
+      long time = Long.parseLong(m.group(1));
+      return now() - time > OLD_TMP_RECORD_MS;
+    }
+    return false;
   }
 
   /**
-   * Overwrite the existing data with a new data set. Replaces all records in
-   * the data store for this record class. If all records in the data store are
-   * not successfully committed, this function must return false and leave the
-   * data store unchanged.
+   * Read a record from a file.
    *
-   * @param records List of records to write. All records must be of type
-   *                recordClass.
-   * @param recordClass Class of record to replace.
-   * @return true if all operations were successful, false otherwise.
-   * @throws StateStoreUnavailableException
+   * @param path Path to the file containing the record.
+   * @param clazz Class of the record.
+   * @return Record read from the file.
+   * @throws IOException If the file cannot be read.
    */
-  public <T extends BaseRecord> boolean writeAll(
-      Collection<T> records, Class<T> recordClass)
-          throws StateStoreUnavailableException {
-    verifyDriverReady();
-    lockRecordWrite(recordClass);
-    BufferedWriter writer = null;
+  private <T extends BaseRecord> T getRecord(
+      final String path, final Class<T> clazz) throws IOException {
+    BufferedReader reader = getReader(path);
     try {
-      writer = getWriter(recordClass, null);
-      return writeAllFile(records, writer);
-    } catch (Exception e) {
-      LOG.error(
-          "Cannot add records to file for {}", recordClass.getSimpleName(), e);
-      return false;
-    } finally {
-      if (writer != null) {
-        try {
-          writer.close();
-        } catch (IOException e) {
-          LOG.error(
-              "Cannot close writer for {}", recordClass.getSimpleName(), e);
+      String line;
+      while ((line = reader.readLine()) != null) {
+        if (!line.startsWith("#") && line.length() > 0) {
+          try {
+            T record = newRecord(line, clazz, false);
+            return record;
+          } catch (Exception ex) {
+            LOG.error("Cannot parse line {} in file {}", line, path, ex);
+          }
         }
       }
-      unlockRecordWrite(recordClass);
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
     }
+    throw new IOException("Cannot read " + path + " for record " +
+        clazz.getSimpleName());
   }
 
   /**
-   * Get the data file name.
-   *
-   * @return Data file name.
+   * Get the path for a record class.
+   * @param clazz Class of the record.
+   * @return Path for this record class.
    */
-  protected String getDataFileName() {
-    return DATA_FILE_NAME;
+  private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
+    String className = StateStoreUtils.getRecordName(clazz);
+    StringBuilder sb = new StringBuilder();
+    sb.append(getRootDir());
+    if (sb.charAt(sb.length() - 1) != '/') {
+      sb.append("/");
+    }
+    sb.append(className);
+    return sb.toString();
   }
 
   @Override
@@ -332,56 +307,80 @@ public abstract class StateStoreFileBaseImpl
       List<T> records, boolean allowUpdate, boolean errorIfExists)
           throws StateStoreUnavailableException {
     verifyDriverReady();
-
     if (records.isEmpty()) {
       return true;
     }
 
-    @SuppressWarnings("unchecked")
-    Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
-    QueryResult<T> result;
-    try {
-      result = get(clazz);
-    } catch (IOException e) {
-      return false;
-    }
-    Map<Object, T> writeList = new HashMap<>();
+    long start = monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
 
-    // Write all of the existing records
-    for (T existingRecord : result.getRecords()) {
-      String key = existingRecord.getPrimaryKey();
-      writeList.put(key, existingRecord);
-    }
+    // Check if any record exists
+    Map<String, T> toWrite = new HashMap<>();
+    for (T record : records) {
+      Class<? extends BaseRecord> recordClass = record.getClass();
+      String path = getPathForClass(recordClass);
+      String primaryKey = getPrimaryKey(record);
+      String recordPath = path + "/" + primaryKey;
 
-    // Add inserts and updates, overwrite any existing values
-    for (T updatedRecord : records) {
-      try {
-        updatedRecord.validate();
-        String key = updatedRecord.getPrimaryKey();
-        if (writeList.containsKey(key) && allowUpdate) {
-          // Update
-          writeList.put(key, updatedRecord);
+      if (exists(recordPath)) {
+        if (allowUpdate) {
           // Update the mod time stamp. Many backends will use their
           // own timestamp for the mod time.
-          updatedRecord.setDateModified(this.getTime());
-        } else if (!writeList.containsKey(key)) {
-          // Insert
-          // Create/Mod timestamps are already initialized
-          writeList.put(key, updatedRecord);
+          record.setDateModified(this.getTime());
+          toWrite.put(recordPath, record);
         } else if (errorIfExists) {
           LOG.error("Attempt to insert record {} that already exists",
-              updatedRecord);
+              recordPath);
+          if (metrics != null) {
+            metrics.addFailure(monotonicNow() - start);
+          }
           return false;
+        } else  {
+          LOG.debug("Not updating {}", record);
         }
-      } catch (IllegalArgumentException ex) {
-        LOG.error("Cannot write invalid record to State Store", ex);
-        return false;
+      } else {
+        toWrite.put(recordPath, record);
       }
     }
 
-    // Write all
-    boolean status = writeAll(writeList.values(), clazz);
-    return status;
+    // Write the records
+    boolean success = true;
+    for (Entry<String, T> entry : toWrite.entrySet()) {
+      String recordPath = entry.getKey();
+      String recordPathTemp = recordPath + "." + now() + TMP_MARK;
+      BufferedWriter writer = getWriter(recordPathTemp);
+      try {
+        T record = entry.getValue();
+        String line = serializeString(record);
+        writer.write(line);
+      } catch (IOException e) {
+        LOG.error("Cannot write {}", recordPathTemp, e);
+        success = false;
+      } finally {
+        if (writer != null) {
+          try {
+            writer.close();
+          } catch (IOException e) {
+            LOG.error("Cannot close the writer for {}", recordPathTemp);
+          }
+        }
+      }
+      // Commit
+      if (!rename(recordPathTemp, recordPath)) {
+        LOG.error("Failed committing record into {}", recordPath);
+        success = false;
+      }
+    }
+
+    long end = monotonicNow();
+    if (metrics != null) {
+      if (success) {
+        metrics.addWrite(end - start);
+      } else {
+        metrics.addFailure(end - start);
+      }
+    }
+    return success;
   }
 
   @Override
@@ -393,6 +392,8 @@ public abstract class StateStoreFileBaseImpl
       return 0;
     }
 
+    long start = Time.monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
     int removed = 0;
     // Get the current records
     try {
@@ -400,21 +401,34 @@ public abstract class StateStoreFileBaseImpl
       final List<T> existingRecords = result.getRecords();
       // Write all of the existing records except those to be removed
       final List<T> recordsToRemove = filterMultiple(query, existingRecords);
-      removed = recordsToRemove.size();
-      final List<T> newRecords = new LinkedList<>();
-      for (T record : existingRecords) {
-        if (!recordsToRemove.contains(record)) {
-          newRecords.add(record);
+      boolean success = true;
+      for (T recordToRemove : recordsToRemove) {
+        String path = getPathForClass(clazz);
+        String primaryKey = getPrimaryKey(recordToRemove);
+        String recordToRemovePath = path + "/" + primaryKey;
+        if (remove(recordToRemovePath)) {
+          removed++;
+        } else {
+          LOG.error("Cannot remove record {}", recordToRemovePath);
+          success = false;
         }
       }
-      if (!writeAll(newRecords, clazz)) {
-        throw new IOException(
-            "Cannot remove record " + clazz + " query " + query);
+      if (!success) {
+        LOG.error("Cannot remove records {} query {}", clazz, query);
+        if (metrics != null) {
+          metrics.addFailure(monotonicNow() - start);
+        }
       }
     } catch (IOException e) {
       LOG.error("Cannot remove records {} query {}", clazz, query, e);
+      if (metrics != null) {
+        metrics.addFailure(monotonicNow() - start);
+      }
     }
 
+    if (removed > 0 && metrics != null) {
+      metrics.addRemove(monotonicNow() - start);
+    }
     return removed;
   }
 
@@ -422,8 +436,27 @@ public abstract class StateStoreFileBaseImpl
   public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
       throws StateStoreUnavailableException {
     verifyDriverReady();
-    List<T> emptyList = new ArrayList<>();
-    boolean status = writeAll(emptyList, clazz);
-    return status;
+    long start = Time.monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+
+    boolean success = true;
+    String path = getPathForClass(clazz);
+    List<String> children = getChildren(path);
+    for (String child : children) {
+      String pathRecord = path + "/" + child;
+      if (!remove(pathRecord)) {
+        success = false;
+      }
+    }
+
+    if (metrics != null) {
+      long time = Time.monotonicNow() - start;
+      if (success) {
+        metrics.addRemove(time);
+      } else {
+        metrics.addFailure(time);
+      }
+    }
+    return success;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
index 24e9660..c585a23 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
@@ -26,11 +26,10 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,10 +48,6 @@ public class StateStoreFileImpl extends 
StateStoreFileBaseImpl {
   public static final String FEDERATION_STORE_FILE_DIRECTORY =
       DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
 
-  /** Synchronization. */
-  private static final ReadWriteLock READ_WRITE_LOCK =
-      new ReentrantReadWriteLock();
-
   /** Root directory for the state store. */
   private String rootDirectory;
 
@@ -70,12 +65,30 @@ public class StateStoreFileImpl extends 
StateStoreFileBaseImpl {
   }
 
   @Override
+  protected boolean rename(String src, String dst) {
+    try {
+      Files.move(new File(src), new File(dst));
+      return true;
+    } catch (IOException e) {
+      LOG.error("Cannot rename {} to {}", src, dst, e);
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean remove(String path) {
+    File file = new File(path);
+    return file.delete();
+  }
+
+  @Override
   protected String getRootDir() {
     if (this.rootDirectory == null) {
       String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
       if (dir == null) {
         File tempDir = Files.createTempDir();
         dir = tempDir.getAbsolutePath();
+        LOG.warn("The root directory is not available, using {}", dir);
       }
       this.rootDirectory = dir;
     }
@@ -83,79 +96,53 @@ public class StateStoreFileImpl extends 
StateStoreFileBaseImpl {
   }
 
   @Override
-  protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) {
-    // TODO - Synchronize via FS
-    READ_WRITE_LOCK.writeLock().lock();
-  }
-
-  @Override
-  protected <T extends BaseRecord> void unlockRecordWrite(
-      Class<T> recordClass) {
-    // TODO - Synchronize via FS
-    READ_WRITE_LOCK.writeLock().unlock();
-  }
-
-  @Override
-  protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) {
-    // TODO - Synchronize via FS
-    READ_WRITE_LOCK.readLock().lock();
-  }
-
-  @Override
-  protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) 
{
-    // TODO - Synchronize via FS
-    READ_WRITE_LOCK.readLock().unlock();
-  }
-
-  @Override
-  protected <T extends BaseRecord> BufferedReader getReader(
-      Class<T> clazz, String sub) {
-    String filename = StateStoreUtils.getRecordName(clazz);
-    if (sub != null && sub.length() > 0) {
-      filename += "/" + sub;
-    }
-    filename += "/" + getDataFileName();
-
+  protected <T extends BaseRecord> BufferedReader getReader(String filename) {
+    BufferedReader reader = null;
     try {
       LOG.debug("Loading file: {}", filename);
-      File file = new File(getRootDir(), filename);
+      File file = new File(filename);
       FileInputStream fis = new FileInputStream(file);
       InputStreamReader isr =
           new InputStreamReader(fis, StandardCharsets.UTF_8);
-      BufferedReader reader = new BufferedReader(isr);
-      return reader;
+      reader = new BufferedReader(isr);
     } catch (Exception ex) {
-      LOG.error(
-          "Cannot open read stream for record {}", clazz.getSimpleName(), ex);
-      return null;
+      LOG.error("Cannot open read stream for record {}", filename, ex);
     }
+    return reader;
   }
 
   @Override
-  protected <T extends BaseRecord> BufferedWriter getWriter(
-      Class<T> clazz, String sub) {
-    String filename = StateStoreUtils.getRecordName(clazz);
-    if (sub != null && sub.length() > 0) {
-      filename += "/" + sub;
-    }
-    filename += "/" + getDataFileName();
-
+  protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
+    BufferedWriter writer = null;
     try {
-      File file = new File(getRootDir(), filename);
+      LOG.debug("Writing file: {}", filename);
+      File file = new File(filename);
       FileOutputStream fos = new FileOutputStream(file, false);
       OutputStreamWriter osw =
           new OutputStreamWriter(fos, StandardCharsets.UTF_8);
-      BufferedWriter writer = new BufferedWriter(osw);
-      return writer;
-    } catch (IOException ex) {
-      LOG.error(
-          "Cannot open read stream for record {}", clazz.getSimpleName(), ex);
-      return null;
+      writer = new BufferedWriter(osw);
+    } catch (IOException e) {
+      LOG.error("Cannot open write stream for record {}", filename, e);
     }
+    return writer;
   }
 
   @Override
   public void close() throws Exception {
     setInitialized(false);
   }
+
+  @Override
+  protected List<String> getChildren(String path) {
+    List<String> ret = new LinkedList<>();
+    File dir = new File(path);
+    File[] files = dir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        String filename = file.getName();
+        ret.add(filename);
+      }
+    }
+    return ret;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
index d9ef280..8d6c626 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
@@ -24,13 +24,17 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +77,36 @@ public class StateStoreFileSystemImpl extends 
StateStoreFileBaseImpl {
   }
 
   @Override
+  protected boolean rename(String src, String dst) {
+    try {
+      if (fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
+        return true;
+      } else {
+        // Replace should be atomic but not available
+        if (fs.exists(new Path(dst))) {
+          fs.delete(new Path(dst), true);
+        }
+        return fs.rename(new Path(src), new Path(dst));
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot rename {} to {}", src, dst, e);
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean remove(String path) {
+    try {
+      return fs.delete(new Path(path), true);
+    } catch (Exception e) {
+      LOG.error("Cannot remove {}", path, e);
+      return false;
+    }
+  }
+
+  @Override
   protected String getRootDir() {
     if (this.workPath == null) {
       String rootPath = getConf().get(FEDERATION_STORE_FS_PATH);
@@ -95,84 +129,50 @@ public class StateStoreFileSystemImpl extends 
StateStoreFileBaseImpl {
     }
   }
 
-  /**
-   * Get the folder path for the record class' data.
-   *
-   * @param clazz Data record class.
-   * @return Path of the folder containing the record class' data files.
-   */
-  private Path getPathForClass(Class<? extends BaseRecord> clazz) {
-    if (clazz == null) {
-      return null;
-    }
-    // TODO extract table name from class: entry.getTableName()
-    String className = StateStoreUtils.getRecordName(clazz);
-    return new Path(workPath, className);
-  }
-
-  @Override
-  protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) {
-    // Not required, synced with HDFS leasing
-  }
-
-  @Override
-  protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) {
-    // Not required, synced with HDFS leasing
-  }
-
-  @Override
-  protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) {
-    // TODO -> wait for lease to be available
-  }
-
   @Override
-  protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) {
-    // TODO -> ensure lease is closed for the file
-  }
-
-  @Override
-  protected <T extends BaseRecord> BufferedReader getReader(
-      Class<T> clazz, String sub) {
-
-    Path path = getPathForClass(clazz);
-    if (sub != null && sub.length() > 0) {
-      path = Path.mergePaths(path, new Path("/" + sub));
-    }
-    path = Path.mergePaths(path, new Path("/" + getDataFileName()));
-
+  protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
+    BufferedReader reader = null;
+    Path path = new Path(pathName);
     try {
       FSDataInputStream fdis = fs.open(path);
       InputStreamReader isr =
           new InputStreamReader(fdis, StandardCharsets.UTF_8);
-      BufferedReader reader = new BufferedReader(isr);
-      return reader;
+      reader = new BufferedReader(isr);
     } catch (IOException ex) {
-      LOG.error("Cannot open write stream for {}  to {}",
-          clazz.getSimpleName(), path);
-      return null;
+      LOG.error("Cannot open read stream for {}", path);
     }
+    return reader;
   }
 
   @Override
-  protected <T extends BaseRecord> BufferedWriter getWriter(
-      Class<T> clazz, String sub) {
-
-    Path path = getPathForClass(clazz);
-    if (sub != null && sub.length() > 0) {
-      path = Path.mergePaths(path, new Path("/" + sub));
-    }
-    path = Path.mergePaths(path, new Path("/" + getDataFileName()));
-
+  protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
+    BufferedWriter writer = null;
+    Path path = new Path(pathName);
     try {
       FSDataOutputStream fdos = fs.create(path, true);
       OutputStreamWriter osw =
           new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
-      BufferedWriter writer = new BufferedWriter(osw);
-      return writer;
+      writer = new BufferedWriter(osw);
     } catch (IOException ex) {
-      LOG.error("Cannot open write stream for {} to {}",
-          clazz.getSimpleName(), path);
-      return null;
+      LOG.error("Cannot open write stream for {}", path);
+    }
+    return writer;
+  }
+
+  @Override
+  protected List<String> getChildren(String pathName) {
+    List<String> ret = new LinkedList<>();
+    Path path = new Path(workPath, pathName);
+    try {
+      FileStatus[] files = fs.listStatus(path);
+      for (FileStatus file : files) {
+        Path filePath = file.getPath();
+        String fileName = filePath.getName();
+        ret.add(fileName);
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot get children for {}", pathName, e);
     }
+    return ret;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
index 1c3f756..69b9b98 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -117,12 +117,6 @@ public class StateStoreZooKeeperImpl extends 
StateStoreSerializableImpl {
   @Override
   public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
       throws IOException {
-    return get(clazz, (String)null);
-  }
-
-  @Override
-  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
-      throws IOException {
     verifyDriverReady();
     long start = monotonicNow();
     List<T> ret = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 1091c21..fd29e37 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.hdfs.server.federation.store.records.Query;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
 import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
 import 
org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +75,14 @@ public class TestStateStoreDriverBase {
     return stateStore.getDriver();
   }
 
+  @After
+  public void cleanMetrics() {
+    if (stateStore != null) {
+      StateStoreMetrics metrics = stateStore.getMetrics();
+      metrics.reset();
+    }
+  }
+
   @AfterClass
   public static void tearDownCluster() {
     if (stateStore != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java
index 920e280..a8a9020 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java
@@ -61,4 +61,16 @@ public class TestStateStoreFile extends 
TestStateStoreDriverBase {
       throws IllegalArgumentException, IllegalAccessException, IOException {
     testRemove(getStateStoreDriver());
   }
+
+  @Test
+  public void testFetchErrors()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testFetchErrors(getStateStoreDriver());
+  }
+
+  @Test
+  public void testMetrics()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testMetrics(getStateStoreDriver());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java
new file mode 100644
index 0000000..9adfe33
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl.isOldTempRecord;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+
+/**
+ * Tests for the State Store file based implementation.
+ */
+public class TestStateStoreFileBase {
+
+  @Test
+  public void testTempOld() {
+    assertFalse(isOldTempRecord("test.txt"));
+    assertFalse(isOldTempRecord("testfolder/test.txt"));
+
+    long tnow = Time.now();
+    String tmpFile1 = "test." + tnow + ".tmp";
+    assertFalse(isOldTempRecord(tmpFile1));
+
+    long told = Time.now() - TimeUnit.MINUTES.toMillis(1);
+    String tmpFile2 = "test." + told + ".tmp";
+    assertTrue(isOldTempRecord(tmpFile2));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76be6cbf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
index da2e51d..8c4b188 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java
@@ -69,15 +69,15 @@ public class TestStateStoreFileSystem extends 
TestStateStoreDriverBase {
   }
 
   @Test
-  public void testUpdate()
-      throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+  public void testUpdate() throws IllegalArgumentException, IOException,
+      SecurityException, ReflectiveOperationException {
+    testPut(getStateStoreDriver());
   }
 
   @Test
   public void testDelete()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    testRemove(getStateStoreDriver());
   }
 
   @Test
@@ -85,4 +85,10 @@ public class TestStateStoreFileSystem extends 
TestStateStoreDriverBase {
       throws IllegalArgumentException, IllegalAccessException, IOException {
     testFetchErrors(getStateStoreDriver());
   }
+
+  @Test
+  public void testMetrics()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testMetrics(getStateStoreDriver());
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to