HADOOP-13065. Add a new interface for retrieving FS and FC Statistics 
(Mingliang Liu via cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/687233f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/687233f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/687233f2

Branch: refs/heads/HDFS-1312
Commit: 687233f20d24c29041929dd0a99d963cec54b6df
Parents: acb509b
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Wed May 11 13:45:39 2016 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Wed May 11 13:45:39 2016 -0700

----------------------------------------------------------------------
 .../hadoop/fs/EmptyStorageStatistics.java       |  43 +++++
 .../java/org/apache/hadoop/fs/FileSystem.java   |  47 ++++-
 .../hadoop/fs/FileSystemStorageStatistics.java  | 136 +++++++++++++++
 .../hadoop/fs/GlobalStorageStatistics.java      | 127 ++++++++++++++
 .../org/apache/hadoop/fs/StorageStatistics.java |  93 ++++++++++
 .../hadoop/fs/UnionStorageStatistics.java       | 113 ++++++++++++
 .../apache/hadoop/fs/TestFilterFileSystem.java  |   1 +
 .../org/apache/hadoop/fs/TestHarFileSystem.java |   1 +
 .../hadoop/hdfs/DFSOpsCountStatistics.java      | 167 ++++++++++++++++++
 .../hadoop/hdfs/DistributedFileSystem.java      |  81 ++++++++-
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      |  51 ++++++
 .../hadoop/hdfs/TestDistributedFileSystem.java  | 170 +++++++++++++++++--
 12 files changed, 1012 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java
new file mode 100644
index 0000000..1bcfe23
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * EmptyStorageStatistics is a StorageStatistics implementation which has no
+ * data.
+ */
+class EmptyStorageStatistics extends StorageStatistics {
+  EmptyStorageStatistics(String name) {
+    super(name);
+  }
+
+  public Iterator<LongStatistic> getLongStatistics() {
+    return Collections.emptyIterator();
+  }
+
+  public Long getLong(String key) {
+    return null;
+  }
+
+  public boolean isTracked(String key) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index b376763..574027e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -3560,7 +3561,7 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   /**
    * Get the Map of Statistics object indexed by URI Scheme.
    * @return a Map having a key as URI scheme and value as Statistics object
-   * @deprecated use {@link #getAllStatistics} instead
+   * @deprecated use {@link #getGlobalStorageStatistics()}
    */
   @Deprecated
   public static synchronized Map<String, Statistics> getStatistics() {
@@ -3572,8 +3573,10 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   }
 
   /**
-   * Return the FileSystem classes that have Statistics
+   * Return the FileSystem classes that have Statistics.
+   * @deprecated use {@link #getGlobalStorageStatistics()}
    */
+  @Deprecated
   public static synchronized List<Statistics> getAllStatistics() {
     return new ArrayList<Statistics>(statisticsTable.values());
   }
@@ -3582,13 +3585,23 @@ public abstract class FileSystem extends Configured 
implements Closeable {
    * Get the statistics for a particular file system
    * @param cls the class to lookup
    * @return a statistics object
+   * @deprecated use {@link #getGlobalStorageStatistics()}
    */
-  public static synchronized 
-  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
+  @Deprecated
+  public static synchronized Statistics getStatistics(final String scheme,
+      Class<? extends FileSystem> cls) {
     Statistics result = statisticsTable.get(cls);
     if (result == null) {
-      result = new Statistics(scheme);
-      statisticsTable.put(cls, result);
+      final Statistics newStats = new Statistics(scheme);
+      statisticsTable.put(cls, newStats);
+      result = newStats;
+      GlobalStorageStatistics.INSTANCE.put(scheme,
+          new StorageStatisticsProvider() {
+            @Override
+            public StorageStatistics provide() {
+              return new FileSystemStorageStatistics(scheme, newStats);
+            }
+          });
     }
     return result;
   }
@@ -3628,4 +3641,26 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   public static void enableSymlinks() {
     symlinksEnabled = true;
   }
+
+  /**
+   * Get the StorageStatistics for this FileSystem object.  These statistics 
are
+   * per-instance.  They are not shared with any other FileSystem object.
+   *
+   * <p>This is a default method which is intended to be overridden by
+   * subclasses. The default implementation returns an empty storage statistics
+   * object.</p>
+   *
+   * @return    The StorageStatistics for this FileSystem instance.
+   *            Will never be null.
+   */
+  public StorageStatistics getStorageStatistics() {
+    return new EmptyStorageStatistics(getUri().toString());
+  }
+
+  /**
+   * Get the global storage statistics.
+   */
+  public static GlobalStorageStatistics getGlobalStorageStatistics() {
+    return GlobalStorageStatistics.INSTANCE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
new file mode 100644
index 0000000..14f7cdd
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
+
+/**
+ * A basic StorageStatistics instance which simply returns data from
+ * FileSystem#Statistics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileSystemStorageStatistics extends StorageStatistics {
+  /**
+   * The per-class FileSystem statistics.
+   */
+  private final FileSystem.Statistics stats;
+
+  private static final String[] KEYS = {
+      "bytesRead",
+      "bytesWritten",
+      "readOps",
+      "largeReadOps",
+      "writeOps",
+      "bytesReadLocalHost",
+      "bytesReadDistanceOfOneOrTwo",
+      "bytesReadDistanceOfThreeOrFour",
+      "bytesReadDistanceOfFiveOrLarger"
+  };
+
+  private static class LongStatisticIterator
+      implements Iterator<LongStatistic> {
+    private final StatisticsData data;
+
+    private int keyIdx;
+
+    LongStatisticIterator(StatisticsData data) {
+      this.data = data;
+      this.keyIdx = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return (this.keyIdx < KEYS.length);
+    }
+
+    @Override
+    public LongStatistic next() {
+      if (this.keyIdx >= KEYS.length) {
+        throw new NoSuchElementException();
+      }
+      String key = KEYS[this.keyIdx++];
+      Long val = fetch(data, key);
+      return new LongStatistic(key, val.longValue());
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static Long fetch(StatisticsData data, String key) {
+    switch (key) {
+    case "bytesRead":
+      return data.getBytesRead();
+    case "bytesWritten":
+      return data.getBytesWritten();
+    case "readOps":
+      return Long.valueOf(data.getReadOps());
+    case "largeReadOps":
+      return Long.valueOf(data.getLargeReadOps());
+    case "writeOps":
+      return Long.valueOf(data.getWriteOps());
+    case "bytesReadLocalHost":
+      return data.getBytesReadLocalHost();
+    case "bytesReadDistanceOfOneOrTwo":
+      return data.getBytesReadDistanceOfOneOrTwo();
+    case "bytesReadDistanceOfThreeOrFour":
+      return data.getBytesReadDistanceOfThreeOrFour();
+    case "bytesReadDistanceOfFiveOrLarger":
+      return data.getBytesReadDistanceOfFiveOrLarger();
+    default:
+      return null;
+    }
+  }
+
+  FileSystemStorageStatistics(String name, FileSystem.Statistics stats) {
+    super(name);
+    this.stats = stats;
+  }
+
+  @Override
+  public Iterator<LongStatistic> getLongStatistics() {
+    return new LongStatisticIterator(stats.getData());
+  }
+
+  @Override
+  public Long getLong(String key) {
+    return fetch(stats.getData(), key);
+  }
+
+  /**
+   * Return true if a statistic is being tracked.
+   *
+   * @return         True only if the statistic is being tracked.
+   */
+  public boolean isTracked(String key) {
+    for (String k: KEYS) {
+      if (k.equals(key)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java
new file mode 100644
index 0000000..f22e78c
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Stores global storage statistics objects.
+ */
+@InterfaceAudience.Public
+public enum GlobalStorageStatistics {
+  /**
+   * The GlobalStorageStatistics singleton.
+   */
+  INSTANCE;
+
+  /**
+   * A map of all global StorageStatistics objects, indexed by name.
+   */
+  private final NavigableMap<String, StorageStatistics> map = new TreeMap<>();
+
+  /**
+   * A callback API for creating new StorageStatistics instances.
+   */
+  public interface StorageStatisticsProvider {
+    StorageStatistics provide();
+  }
+
+  /**
+   * Get the StorageStatistics object with the given name.
+   *
+   * @param name        The storage statistics object name.
+   * @return            The StorageStatistics object with the given name, or
+   *                      null if there is none.
+   */
+  public synchronized StorageStatistics get(String name) {
+    return map.get(name);
+  }
+
+  /**
+   * Create or return the StorageStatistics object with the given name.
+   *
+   * @param name        The storage statistics object name.
+   * @param provider    An object which can create a new StorageStatistics
+   *                      object if needed.
+   * @return            The StorageStatistics object with the given name.
+   * @throws RuntimeException  If the StorageStatisticsProvider provides a new
+   *                           StorageStatistics object with the wrong name.
+   */
+  public synchronized StorageStatistics put(String name,
+      StorageStatisticsProvider provider) {
+    StorageStatistics stats = map.get(name);
+    if (stats != null) {
+      return stats;
+    }
+    stats = provider.provide();
+    if (!stats.getName().equals(name)) {
+      throw new RuntimeException("StorageStatisticsProvider for " + name +
+          " provided a StorageStatistics object for " + stats.getName() +
+          " instead.");
+    }
+    map.put(name, stats);
+    return stats;
+  }
+
+  /**
+   * Get an iterator that we can use to iterate throw all the global storage
+   * statistics objects.
+   */
+  synchronized public Iterator<StorageStatistics> iterator() {
+    Entry<String, StorageStatistics> first = map.firstEntry();
+    return new StorageIterator((first == null) ? null : first.getValue());
+  }
+
+  private class StorageIterator implements Iterator<StorageStatistics> {
+    private StorageStatistics next = null;
+
+    StorageIterator(StorageStatistics first) {
+      this.next = first;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return (next != null);
+    }
+
+    @Override
+    public StorageStatistics next() {
+      if (next == null) {
+        throw new NoSuchElementException();
+      }
+      synchronized (GlobalStorageStatistics.this) {
+        StorageStatistics cur = next;
+        Entry<String, StorageStatistics> nextEntry =
+            map.higherEntry(cur.getName());
+        next = (nextEntry == null) ? null : nextEntry.getValue();
+        return cur;
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
new file mode 100644
index 0000000..4bdef80
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Iterator;
+
+/**
+ * StorageStatistics contains statistics data for a FileSystem or FileContext
+ * instance.
+ */
+@InterfaceAudience.Public
+public abstract class StorageStatistics {
+  /**
+   * A 64-bit storage statistic.
+   */
+  public static class LongStatistic {
+    private final String name;
+    private final long value;
+
+    public LongStatistic(String name, long value) {
+      this.name = name;
+      this.value = value;
+    }
+
+    /**
+     * @return    The name of this statistic.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * @return    The value of this statistic.
+     */
+    public long getValue() {
+      return value;
+    }
+  }
+
+  private final String name;
+
+  public StorageStatistics(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the name of this StorageStatistics object.
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Get an iterator over all the currently tracked long statistics.
+   *
+   * The values returned will depend on the type of FileSystem or FileContext
+   * object.  The values do not necessarily reflect a snapshot in time.
+   */
+  public abstract Iterator<LongStatistic> getLongStatistics();
+
+  /**
+   * Get the value of a statistic.
+   *
+   * @return         null if the statistic is not being tracked or is not a
+   *                     long statistic.
+   *                 The value of the statistic, otherwise.
+   */
+  public abstract Long getLong(String key);
+
+  /**
+   * Return true if a statistic is being tracked.
+   *
+   * @return         True only if the statistic is being tracked.
+   */
+  public abstract boolean isTracked(String key);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java
new file mode 100644
index 0000000..d9783e6
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A StorageStatistics instance which combines the outputs of several other
+ * StorageStatistics instances.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UnionStorageStatistics extends StorageStatistics {
+  /**
+   * The underlying StorageStatistics.
+   */
+  private final StorageStatistics[] stats;
+
+  private class LongStatisticIterator implements Iterator<LongStatistic> {
+    private int statIdx;
+
+    private Iterator<LongStatistic> cur;
+
+    LongStatisticIterator() {
+      this.statIdx = 0;
+      this.cur = null;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return (getIter() != null);
+    }
+
+    private Iterator<LongStatistic> getIter() {
+      while ((cur == null) || (!cur.hasNext())) {
+        if (stats.length >= statIdx) {
+          return null;
+        }
+        cur = stats[statIdx++].getLongStatistics();
+      }
+      return cur;
+    }
+
+    @Override
+    public LongStatistic next() {
+      Iterator<LongStatistic> iter = getIter();
+      if (iter == null) {
+        throw new NoSuchElementException();
+      }
+      return iter.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public UnionStorageStatistics(String name, StorageStatistics[] stats) {
+    super(name);
+    this.stats = stats;
+  }
+
+  @Override
+  public Iterator<LongStatistic> getLongStatistics() {
+    return new LongStatisticIterator();
+  }
+
+  @Override
+  public Long getLong(String key) {
+    for (int i = 0; i < stats.length; i++) {
+      Long val = stats[i].getLong(key);
+      if (val != null) {
+        return val;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Return true if a statistic is being tracked.
+   *
+   * @return         True only if the statistic is being tracked.
+   */
+  @Override
+  public boolean isTracked(String key) {
+    for (int i = 0; i < stats.length; i++) {
+      if (stats[i].isTracked(key)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index 3ecce8f..e1312bc 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -132,6 +132,7 @@ public class TestFilterFileSystem {
     public Path fixRelativePart(Path p);
     public ContentSummary getContentSummary(Path f);
     public QuotaUsage getQuotaUsage(Path f);
+    StorageStatistics getStorageStatistics();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index a8795cc..d2020b9 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -221,6 +221,7 @@ public class TestHarFileSystem {
     public Path getTrashRoot(Path path) throws IOException;
 
     public Collection<FileStatus> getTrashRoots(boolean allUsers) throws 
IOException;
+    StorageStatistics getStorageStatistics();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
new file mode 100644
index 0000000..d58a59f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.fs.StorageStatistics;
+
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This storage statistics tracks how many times each DFS operation was issued.
+ *
+ * For each tracked DFS operation, there is a respective entry in the enum
+ * {@link OpType}. To use, increment the value the {@link 
DistributedFileSystem}
+ * and {@link org.apache.hadoop.hdfs.web.WebHdfsFileSystem}.
+ *
+ * This class is thread safe, and is generally shared by multiple threads.
+ */
+public class DFSOpsCountStatistics extends StorageStatistics {
+
+  /** This is for counting file system operations. */
+  public enum OpType {
+    ALLOW_SNAPSHOT("allowSnapshot"),
+    APPEND("append"),
+    CONCAT("concat"),
+    COPY_FROM_LOCAL_FILE("copyFromLocalFile"),
+    CREATE("create"),
+    CREATE_NON_RECURSIVE("createNonRecursive"),
+    CREATE_SNAPSHOT("createSnapshot"),
+    CREATE_SYM_LINK("createSymlink"),
+    DELETE("delete"),
+    DELETE_SNAPSHOT("deleteSnapshot"),
+    DISALLOW_SNAPSHOT("disallowSnapshot"),
+    EXISTS("exists"),
+    GET_BYTES_WITH_FUTURE_GS("getBytesWithFutureGenerationStamps"),
+    GET_CONTENT_SUMMARY("getContentSummary"),
+    GET_FILE_BLOCK_LOCATIONS("getFileBlockLocations"),
+    GET_FILE_CHECKSUM("getFileChecksum"),
+    GET_FILE_LINK_STATUS("getFileLinkStatus"),
+    GET_FILE_STATUS("getFileStatus"),
+    GET_LINK_TARGET("getLinkTarget"),
+    GET_QUOTA_USAGE("getQuotaUsage"),
+    GET_STATUS("getStatus"),
+    GET_STORAGE_POLICIES("getStoragePolicies"),
+    GET_STORAGE_POLICY("getStoragePolicy"),
+    GET_XATTR("getXAttr"),
+    LIST_LOCATED_STATUS("listLocatedStatus"),
+    LIST_STATUS("listStatus"),
+    MKDIRS("mkdirs"),
+    MODIFY_ACL_ENTRIES("modifyAclEntries"),
+    OPEN("open"),
+    PRIMITIVE_CREATE("primitiveCreate"),
+    PRIMITIVE_MKDIR("primitiveMkdir"),
+    REMOVE_ACL("removeAcl"),
+    REMOVE_ACL_ENTRIES("removeAclEntries"),
+    REMOVE_DEFAULT_ACL("removeDefaultAcl"),
+    REMOVE_XATTR("removeXAttr"),
+    RENAME("rename"),
+    RENAME_SNAPSHOT("renameSnapshot"),
+    RESOLVE_LINK("resolveLink"),
+    SET_ACL("setAcl"),
+    SET_OWNER("setOwner"),
+    SET_PERMISSION("setPermission"),
+    SET_REPLICATION("setReplication"),
+    SET_STORAGE_POLICY("setStoragePolicy"),
+    SET_TIMES("setTimes"),
+    SET_XATTR("setXAttr"),
+    TRUNCATE("truncate"),
+    UNSET_STORAGE_POLICY("unsetStoragePolicy");
+
+    private final String symbol;
+
+    OpType(String symbol) {
+      this.symbol = symbol;
+    }
+
+    public String getSymbol() {
+      return symbol;
+    }
+
+    public static OpType fromSymbol(String symbol) {
+      if (symbol != null) {
+        for (OpType opType : values()) {
+          if (opType.getSymbol().equals(symbol)) {
+            return opType;
+          }
+        }
+      }
+      return null;
+    }
+  }
+
+  public static final String NAME = "DFSOpsCountStatistics";
+
+  private final Map<OpType, AtomicLong> opsCount = new EnumMap<>(OpType.class);
+
+  public DFSOpsCountStatistics() {
+    super(NAME);
+    for (OpType opType : OpType.values()) {
+      opsCount.put(opType, new AtomicLong(0));
+    }
+  }
+
+  public void incrementOpCounter(OpType op) {
+    opsCount.get(op).addAndGet(1);
+  }
+
+  private class LongIterator implements Iterator<LongStatistic> {
+    private Iterator<Entry<OpType, AtomicLong>> iterator =
+        opsCount.entrySet().iterator();
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public LongStatistic next() {
+      if (!iterator.hasNext()) {
+        throw new NoSuchElementException();
+      }
+      final Entry<OpType, AtomicLong> entry = iterator.next();
+      return new LongStatistic(entry.getKey().name(), entry.getValue().get());
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public Iterator<LongStatistic> getLongStatistics() {
+    return new LongIterator();
+  }
+
+  @Override
+  public Long getLong(String key) {
+    final OpType type = OpType.fromSymbol(key);
+    return type == null ? null : opsCount.get(type).get();
+  }
+
+  @Override
+  public boolean isTracked(String key) {
+    return OpType.fromSymbol(key) == null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 7a56265..0ae4d70 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -47,8 +47,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemLinkResolver;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
+import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
@@ -65,6 +68,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -95,7 +99,6 @@ import 
org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-
 /****************************************************************
  * Implementation of the abstract FileSystem for the DFS system.
  * This object is the way end-user code interacts with a Hadoop
@@ -113,6 +116,8 @@ public class DistributedFileSystem extends FileSystem {
   DFSClient dfs;
   private boolean verifyChecksum = true;
 
+  private DFSOpsCountStatistics storageStatistics;
+
   static{
     HdfsConfiguration.init();
   }
@@ -150,6 +155,15 @@ public class DistributedFileSystem extends FileSystem {
     this.dfs = new DFSClient(uri, conf, statistics);
     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
     this.workingDir = getHomeDirectory();
+
+    storageStatistics = (DFSOpsCountStatistics) 
GlobalStorageStatistics.INSTANCE
+        .put(DFSOpsCountStatistics.NAME,
+          new StorageStatisticsProvider() {
+            @Override
+            public StorageStatistics provide() {
+              return new DFSOpsCountStatistics();
+            }
+          });
   }
 
   @Override
@@ -214,6 +228,7 @@ public class DistributedFileSystem extends FileSystem {
   public BlockLocation[] getFileBlockLocations(Path p,
       final long start, final long len) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
     final Path absF = fixRelativePart(p);
     return new FileSystemLinkResolver<BlockLocation[]>() {
       @Override
@@ -264,6 +279,7 @@ public class DistributedFileSystem extends FileSystem {
   public FSDataInputStream open(Path f, final int bufferSize)
       throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.OPEN);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FSDataInputStream>() {
       @Override
@@ -300,6 +316,7 @@ public class DistributedFileSystem extends FileSystem {
   public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
       final int bufferSize, final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.APPEND);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FSDataOutputStream>() {
       @Override
@@ -332,6 +349,7 @@ public class DistributedFileSystem extends FileSystem {
       final int bufferSize, final Progressable progress,
       final InetSocketAddress[] favoredNodes) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.APPEND);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FSDataOutputStream>() {
       @Override
@@ -375,6 +393,7 @@ public class DistributedFileSystem extends FileSystem {
       final Progressable progress, final InetSocketAddress[] favoredNodes)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<HdfsDataOutputStream>() {
       @Override
@@ -408,6 +427,7 @@ public class DistributedFileSystem extends FileSystem {
       final Progressable progress, final ChecksumOpt checksumOpt)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FSDataOutputStream>() {
       @Override
@@ -432,6 +452,7 @@ public class DistributedFileSystem extends FileSystem {
       short replication, long blockSize, Progressable progress,
       ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.PRIMITIVE_CREATE);
     final DFSOutputStream dfsos = dfs.primitiveCreate(
         getPathName(fixRelativePart(f)),
         absolutePermission, flag, true, replication, blockSize,
@@ -448,6 +469,7 @@ public class DistributedFileSystem extends FileSystem {
       final int bufferSize, final short replication, final long blockSize,
       final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
     if (flag.contains(CreateFlag.OVERWRITE)) {
       flag.add(CreateFlag.CREATE);
     }
@@ -473,6 +495,7 @@ public class DistributedFileSystem extends FileSystem {
   public boolean setReplication(Path src, final short replication)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
     Path absF = fixRelativePart(src);
     return new FileSystemLinkResolver<Boolean>() {
       @Override
@@ -497,6 +520,7 @@ public class DistributedFileSystem extends FileSystem {
   public void setStoragePolicy(final Path src, final String policyName)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
     Path absF = fixRelativePart(src);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -517,6 +541,7 @@ public class DistributedFileSystem extends FileSystem {
   public void unsetStoragePolicy(final Path src)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
     Path absF = fixRelativePart(src);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -541,6 +566,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICY);
     Path absF = fixRelativePart(path);
 
     return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
@@ -571,6 +597,7 @@ public class DistributedFileSystem extends FileSystem {
    */
   public long getBytesWithFutureGenerationStamps() throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_BYTES_WITH_FUTURE_GS);
     return dfs.getBytesInFutureBlocks();
   }
 
@@ -581,6 +608,7 @@ public class DistributedFileSystem extends FileSystem {
   @Deprecated
   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICIES);
     return dfs.getStoragePolicies();
   }
 
@@ -595,6 +623,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void concat(Path trg, Path [] psrcs) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CONCAT);
     // Make target absolute
     Path absF = fixRelativePart(trg);
     // Make all srcs absolute
@@ -639,6 +668,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.RENAME);
 
     final Path absSrc = fixRelativePart(src);
     final Path absDst = fixRelativePart(dst);
@@ -673,6 +703,7 @@ public class DistributedFileSystem extends FileSystem {
   public void rename(Path src, Path dst, final Options.Rename... options)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.RENAME);
     final Path absSrc = fixRelativePart(src);
     final Path absDst = fixRelativePart(dst);
     // Try the rename without resolving first
@@ -701,6 +732,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public boolean truncate(Path f, final long newLength) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.TRUNCATE);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<Boolean>() {
       @Override
@@ -718,6 +750,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public boolean delete(Path f, final boolean recursive) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.DELETE);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<Boolean>() {
       @Override
@@ -735,6 +768,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public ContentSummary getContentSummary(Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<ContentSummary>() {
       @Override
@@ -752,6 +786,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public QuotaUsage getQuotaUsage(Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<QuotaUsage>() {
       @Override
@@ -836,6 +871,7 @@ public class DistributedFileSystem extends FileSystem {
         stats[i] = partialListing[i].makeQualified(getUri(), p);
       }
       statistics.incrementReadOps(1);
+      storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
       return stats;
     }
 
@@ -850,6 +886,7 @@ public class DistributedFileSystem extends FileSystem {
       listing.add(fileStatus.makeQualified(getUri(), p));
     }
     statistics.incrementLargeReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
 
     // now fetch more entries
     do {
@@ -864,6 +901,7 @@ public class DistributedFileSystem extends FileSystem {
         listing.add(fileStatus.makeQualified(getUri(), p));
       }
       statistics.incrementLargeReadOps(1);
+      storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
     } while (thisListing.hasMore());
 
     return listing.toArray(new FileStatus[listing.size()]);
@@ -977,6 +1015,7 @@ public class DistributedFileSystem extends FileSystem {
       thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
           needLocation);
       statistics.incrementReadOps(1);
+      storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS);
       if (thisListing == null) { // the directory does not exist
         throw new FileNotFoundException("File " + p + " does not exist.");
       }
@@ -1072,6 +1111,7 @@ public class DistributedFileSystem extends FileSystem {
   private boolean mkdirsInternal(Path f, final FsPermission permission,
       final boolean createParent) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.MKDIRS);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<Boolean>() {
       @Override
@@ -1098,6 +1138,7 @@ public class DistributedFileSystem extends FileSystem {
   protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.PRIMITIVE_MKDIR);
     return dfs.primitiveMkdir(getPathName(f), absolutePermission);
   }
 
@@ -1126,6 +1167,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public FsStatus getStatus(Path p) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_STATUS);
     return dfs.getDiskStatus();
   }
 
@@ -1317,6 +1359,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FileStatus>() {
       @Override
@@ -1344,6 +1387,7 @@ public class DistributedFileSystem extends FileSystem {
       throw new UnsupportedOperationException("Symlinks not supported");
     }
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
     final Path absF = fixRelativePart(link);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1367,6 +1411,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public FileStatus getFileLinkStatus(final Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_LINK_STATUS);
     final Path absF = fixRelativePart(f);
     FileStatus status = new FileSystemLinkResolver<FileStatus>() {
       @Override
@@ -1396,6 +1441,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public Path getLinkTarget(final Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_LINK_TARGET);
     final Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<Path>() {
       @Override
@@ -1417,6 +1463,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   protected Path resolveLink(Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.RESOLVE_LINK);
     String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
     if (target == null) {
       throw new FileNotFoundException("File does not exist: " + f.toString());
@@ -1427,6 +1474,7 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public FileChecksum getFileChecksum(Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FileChecksum>() {
       @Override
@@ -1446,6 +1494,7 @@ public class DistributedFileSystem extends FileSystem {
   public FileChecksum getFileChecksum(Path f, final long length)
       throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
     Path absF = fixRelativePart(f);
     return new FileSystemLinkResolver<FileChecksum>() {
       @Override
@@ -1471,6 +1520,7 @@ public class DistributedFileSystem extends FileSystem {
   public void setPermission(Path p, final FsPermission permission
   ) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
     Path absF = fixRelativePart(p);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1495,6 +1545,7 @@ public class DistributedFileSystem extends FileSystem {
       throw new IOException("username == null && groupname == null");
     }
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_OWNER);
     Path absF = fixRelativePart(p);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1516,6 +1567,7 @@ public class DistributedFileSystem extends FileSystem {
   public void setTimes(Path p, final long mtime, final long atime)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_TIMES);
     Path absF = fixRelativePart(p);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1593,6 +1645,8 @@ public class DistributedFileSystem extends FileSystem {
 
   /** @see HdfsAdmin#allowSnapshot(Path) */
   public void allowSnapshot(final Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1619,6 +1673,8 @@ public class DistributedFileSystem extends FileSystem {
 
   /** @see HdfsAdmin#disallowSnapshot(Path) */
   public void disallowSnapshot(final Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1646,6 +1702,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public Path createSnapshot(final Path path, final String snapshotName)
       throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
     Path absF = fixRelativePart(path);
     return new FileSystemLinkResolver<Path>() {
       @Override
@@ -1671,6 +1729,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void renameSnapshot(final Path path, final String snapshotOldName,
       final String snapshotNewName) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1707,6 +1767,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
       throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
     Path absF = fixRelativePart(snapshotDir);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1954,6 +2016,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
       throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1976,6 +2040,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
       throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -1997,6 +2063,8 @@ public class DistributedFileSystem extends FileSystem {
    */
   @Override
   public void removeDefaultAcl(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
     final Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -2017,6 +2085,8 @@ public class DistributedFileSystem extends FileSystem {
    */
   @Override
   public void removeAcl(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
     final Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -2038,6 +2108,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void setAcl(Path path, final List<AclEntry> aclSpec)
       throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_ACL);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -2136,6 +2208,8 @@ public class DistributedFileSystem extends FileSystem {
   @Override
   public void setXAttr(Path path, final String name, final byte[] value,
       final EnumSet<XAttrSetFlag> flag) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_XATTR);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
 
@@ -2155,6 +2229,8 @@ public class DistributedFileSystem extends FileSystem {
 
   @Override
   public byte[] getXAttr(Path path, final String name) throws IOException {
+    statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_XATTR);
     final Path absF = fixRelativePart(path);
     return new FileSystemLinkResolver<byte[]>() {
       @Override
@@ -2220,6 +2296,8 @@ public class DistributedFileSystem extends FileSystem {
 
   @Override
   public void removeXAttr(Path path, final String name) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
     Path absF = fixRelativePart(path);
     new FileSystemLinkResolver<Void>() {
       @Override
@@ -2450,4 +2528,5 @@ public class DistributedFileSystem extends FileSystem {
   Statistics getFsStatistics() {
     return statistics;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index ab4e0d0..9e42b24 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -62,6 +62,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
+import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
@@ -143,6 +148,8 @@ public class WebHdfsFileSystem extends FileSystem
   private static final ObjectReader READER =
       new ObjectMapper().reader(Map.class);
 
+  private DFSOpsCountStatistics storageStatistics;
+
   /**
    * Return the protocol scheme for the FileSystem.
    * <p/>
@@ -240,6 +247,15 @@ public class WebHdfsFileSystem extends FileSystem
         
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.initializeRestCsrf(conf);
     this.delegationToken = null;
+
+    storageStatistics = (DFSOpsCountStatistics) 
GlobalStorageStatistics.INSTANCE
+        .put(DFSOpsCountStatistics.NAME,
+            new StorageStatisticsProvider() {
+              @Override
+              public StorageStatistics provide() {
+                return new DFSOpsCountStatistics();
+              }
+            });
   }
 
   /**
@@ -974,6 +990,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
     return makeQualified(getHdfsFileStatus(f), f);
   }
 
@@ -1003,6 +1020,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.MKDIRS);
     final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
     return new FsPathBooleanRunner(op, f,
         new PermissionParam(applyUMask(permission))
@@ -1015,6 +1033,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void createSymlink(Path destination, Path f, boolean createParent
   ) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
     final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
     new FsPathRunner(op, f,
         new DestinationParam(makeQualified(destination).toUri().getPath()),
@@ -1025,6 +1044,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public boolean rename(final Path src, final Path dst) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.RENAME);
     final HttpOpParam.Op op = PutOpParam.Op.RENAME;
     return new FsPathBooleanRunner(op, src,
         new DestinationParam(makeQualified(dst).toUri().getPath())
@@ -1036,6 +1056,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void rename(final Path src, final Path dst,
       final Options.Rename... options) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.RENAME);
     final HttpOpParam.Op op = PutOpParam.Op.RENAME;
     new FsPathRunner(op, src,
         new DestinationParam(makeQualified(dst).toUri().getPath()),
@@ -1047,6 +1068,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void setXAttr(Path p, String name, byte[] value,
       EnumSet<XAttrSetFlag> flag) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_XATTR);
     final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
     if (value != null) {
       new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
@@ -1060,6 +1082,8 @@ public class WebHdfsFileSystem extends FileSystem
 
   @Override
   public byte[] getXAttr(Path p, final String name) throws IOException {
+    statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_XATTR);
     final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
     return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
         new XAttrEncodingParam(XAttrCodec.HEX)) {
@@ -1116,6 +1140,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public void removeXAttr(Path p, String name) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
     new FsPathRunner(op, p, new XAttrNameParam(name)).run();
   }
@@ -1128,6 +1153,7 @@ public class WebHdfsFileSystem extends FileSystem
     }
 
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_OWNER);
     final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
     new FsPathRunner(op, p,
         new OwnerParam(owner), new GroupParam(group)
@@ -1138,6 +1164,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void setPermission(final Path p, final FsPermission permission
   ) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
     final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
     new FsPathRunner(op, p,new PermissionParam(permission)).run();
   }
@@ -1146,6 +1173,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
     final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
     new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
   }
@@ -1154,6 +1182,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void removeAclEntries(Path path, List<AclEntry> aclSpec)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
     new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
   }
@@ -1161,6 +1190,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public void removeDefaultAcl(Path path) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
     new FsPathRunner(op, path).run();
   }
@@ -1168,6 +1198,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public void removeAcl(Path path) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
     new FsPathRunner(op, path).run();
   }
@@ -1176,12 +1207,14 @@ public class WebHdfsFileSystem extends FileSystem
   public void setAcl(final Path p, final List<AclEntry> aclSpec)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_ACL);
     final HttpOpParam.Op op = PutOpParam.Op.SETACL;
     new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
   }
 
   public void allowSnapshot(final Path p) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
     final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT;
     new FsPathRunner(op, p).run();
   }
@@ -1190,6 +1223,7 @@ public class WebHdfsFileSystem extends FileSystem
   public Path createSnapshot(final Path path, final String snapshotName)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
     final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
     return new FsPathResponseRunner<Path>(op, path,
         new SnapshotNameParam(snapshotName)) {
@@ -1202,6 +1236,7 @@ public class WebHdfsFileSystem extends FileSystem
 
   public void disallowSnapshot(final Path p) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
     final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT;
     new FsPathRunner(op, p).run();
   }
@@ -1210,6 +1245,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void deleteSnapshot(final Path path, final String snapshotName)
       throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
     final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
     new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
   }
@@ -1218,6 +1254,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void renameSnapshot(final Path path, final String snapshotOldName,
       final String snapshotNewName) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
     final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
     new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
         new SnapshotNameParam(snapshotNewName)).run();
@@ -1227,6 +1264,7 @@ public class WebHdfsFileSystem extends FileSystem
   public boolean setReplication(final Path p, final short replication
   ) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
     final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
     return new FsPathBooleanRunner(op, p,
         new ReplicationParam(replication)
@@ -1237,6 +1275,7 @@ public class WebHdfsFileSystem extends FileSystem
   public void setTimes(final Path p, final long mtime, final long atime
   ) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.SET_TIMES);
     final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
     new FsPathRunner(op, p,
         new ModificationTimeParam(mtime),
@@ -1259,6 +1298,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public void concat(final Path trg, final Path [] srcs) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CONCAT);
     final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
     new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
   }
@@ -1268,6 +1308,7 @@ public class WebHdfsFileSystem extends FileSystem
       final boolean overwrite, final int bufferSize, final short replication,
       final long blockSize, final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE);
 
     final HttpOpParam.Op op = PutOpParam.Op.CREATE;
     return new FsPathOutputStreamRunner(op, f, bufferSize,
@@ -1285,6 +1326,7 @@ public class WebHdfsFileSystem extends FileSystem
       final int bufferSize, final short replication, final long blockSize,
       final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
 
     final HttpOpParam.Op op = PutOpParam.Op.CREATE;
     return new FsPathOutputStreamRunner(op, f, bufferSize,
@@ -1301,6 +1343,7 @@ public class WebHdfsFileSystem extends FileSystem
   public FSDataOutputStream append(final Path f, final int bufferSize,
       final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.APPEND);
 
     final HttpOpParam.Op op = PostOpParam.Op.APPEND;
     return new FsPathOutputStreamRunner(op, f, bufferSize,
@@ -1311,6 +1354,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public boolean truncate(Path f, long newLength) throws IOException {
     statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.TRUNCATE);
 
     final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
     return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
@@ -1318,6 +1362,8 @@ public class WebHdfsFileSystem extends FileSystem
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.DELETE);
     final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
     return new FsPathBooleanRunner(op, f,
         new RecursiveParam(recursive)
@@ -1328,6 +1374,7 @@ public class WebHdfsFileSystem extends FileSystem
   public FSDataInputStream open(final Path f, final int bufferSize
   ) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.OPEN);
     return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
   }
 
@@ -1427,6 +1474,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public FileStatus[] listStatus(final Path f) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
 
     final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
     return new FsPathResponseRunner<FileStatus[]>(op, f) {
@@ -1522,6 +1570,7 @@ public class WebHdfsFileSystem extends FileSystem
   public BlockLocation[] getFileBlockLocations(final Path p,
       final long offset, final long length) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
 
     final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
     return new FsPathResponseRunner<BlockLocation[]>(op, p,
@@ -1543,6 +1592,7 @@ public class WebHdfsFileSystem extends FileSystem
   @Override
   public ContentSummary getContentSummary(final Path p) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
 
     final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
     return new FsPathResponseRunner<ContentSummary>(op, p) {
@@ -1557,6 +1607,7 @@ public class WebHdfsFileSystem extends FileSystem
   public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
   ) throws IOException {
     statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
 
     final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
     return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/687233f2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 1db0da8..f3bf6b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.eq;
@@ -39,9 +40,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -49,18 +54,22 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -69,19 +78,27 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import 
org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.InOrder;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestDistributedFileSystem {
   private static final Random RAN = new Random();
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestDistributedFileSystem.class);
 
   static {
     GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
@@ -554,54 +571,84 @@ public class TestDistributedFileSystem {
   }
   
   @Test
-  public void testStatistics() throws Exception {
+  public void testStatistics() throws IOException {
+    FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
+        DistributedFileSystem.class).reset();
+    @SuppressWarnings("unchecked")
+    ThreadLocal<StatisticsData> data = (ThreadLocal<StatisticsData>)
+        Whitebox.getInternalState(
+        FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
+        DistributedFileSystem.class), "threadData");
+    data.set(null);
+
     int lsLimit = 2;
     final Configuration conf = getTestConfiguration();
     conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, lsLimit);
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
+      cluster.waitActive();
       final FileSystem fs = cluster.getFileSystem();
       Path dir = new Path("/test");
       Path file = new Path(dir, "file");
-      
-      int readOps = DFSTestUtil.getStatistics(fs).getReadOps();
-      int writeOps = DFSTestUtil.getStatistics(fs).getWriteOps();
-      int largeReadOps = DFSTestUtil.getStatistics(fs).getLargeReadOps();
+
+      int readOps = 0;
+      int writeOps = 0;
+      int largeReadOps = 0;
+
+      long opCount = getOpStatistics(OpType.MKDIRS);
       fs.mkdirs(dir);
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
+      checkOpStatistics(OpType.MKDIRS, opCount + 1);
       
+      opCount = getOpStatistics(OpType.CREATE);
       FSDataOutputStream out = fs.create(file, (short)1);
       out.close();
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
-      
+      checkOpStatistics(OpType.CREATE, opCount + 1);
+
+      opCount = getOpStatistics(OpType.GET_FILE_STATUS);
       FileStatus status = fs.getFileStatus(file);
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
+      checkOpStatistics(OpType.GET_FILE_STATUS, opCount + 1);
       
+      opCount = getOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS);
       fs.getFileBlockLocations(file, 0, 0);
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
-      
+      checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 1);
       fs.getFileBlockLocations(status, 0, 0);
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
+      checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 2);
       
+      opCount = getOpStatistics(OpType.OPEN);
       FSDataInputStream in = fs.open(file);
       in.close();
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
+      checkOpStatistics(OpType.OPEN, opCount + 1);
       
+      opCount = getOpStatistics(OpType.SET_REPLICATION);
       fs.setReplication(file, (short)2);
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
+      checkOpStatistics(OpType.SET_REPLICATION, opCount + 1);
       
+      opCount = getOpStatistics(OpType.RENAME);
       Path file1 = new Path(dir, "file1");
       fs.rename(file, file1);
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
+      checkOpStatistics(OpType.RENAME, opCount + 1);
       
+      opCount = getOpStatistics(OpType.GET_CONTENT_SUMMARY);
       fs.getContentSummary(file1);
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
+      checkOpStatistics(OpType.GET_CONTENT_SUMMARY, opCount + 1);
       
       
       // Iterative ls test
+      long mkdirOp = getOpStatistics(OpType.MKDIRS);
+      long listStatusOp = getOpStatistics(OpType.LIST_STATUS);
       for (int i = 0; i < 10; i++) {
         Path p = new Path(dir, Integer.toString(i));
         fs.mkdirs(p);
+        mkdirOp++;
         FileStatus[] list = fs.listStatus(dir);
         if (list.length > lsLimit) {
           // if large directory, then count readOps and largeReadOps by 
@@ -609,41 +656,131 @@ public class TestDistributedFileSystem {
           int iterations = (int)Math.ceil((double)list.length/lsLimit);
           largeReadOps += iterations;
           readOps += iterations;
+          listStatusOp += iterations;
         } else {
           // Single iteration in listStatus - no large read operation done
           readOps++;
+          listStatusOp++;
         }
         
         // writeOps incremented by 1 for mkdirs
         // readOps and largeReadOps incremented by 1 or more
         checkStatistics(fs, readOps, ++writeOps, largeReadOps);
+        checkOpStatistics(OpType.MKDIRS, mkdirOp);
+        checkOpStatistics(OpType.LIST_STATUS, listStatusOp);
       }
       
+      opCount = getOpStatistics(OpType.GET_STATUS);
       fs.getStatus(file1);
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
-      
+      checkOpStatistics(OpType.GET_STATUS, opCount + 1);
+
+      opCount = getOpStatistics(OpType.GET_FILE_CHECKSUM);
       fs.getFileChecksum(file1);
       checkStatistics(fs, ++readOps, writeOps, largeReadOps);
+      checkOpStatistics(OpType.GET_FILE_CHECKSUM, opCount + 1);
       
+      opCount = getOpStatistics(OpType.SET_PERMISSION);
       fs.setPermission(file1, new FsPermission((short)0777));
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
+      checkOpStatistics(OpType.SET_PERMISSION, opCount + 1);
       
+      opCount = getOpStatistics(OpType.SET_TIMES);
       fs.setTimes(file1, 0L, 0L);
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
-      
+      checkOpStatistics(OpType.SET_TIMES, opCount + 1);
+
+      opCount = getOpStatistics(OpType.SET_OWNER);
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       fs.setOwner(file1, ugi.getUserName(), ugi.getGroupNames()[0]);
+      checkOpStatistics(OpType.SET_OWNER, opCount + 1);
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
-      
+
+      opCount = getOpStatistics(OpType.DELETE);
       fs.delete(dir, true);
       checkStatistics(fs, readOps, ++writeOps, largeReadOps);
+      checkOpStatistics(OpType.DELETE, opCount + 1);
       
     } finally {
       if (cluster != null) cluster.shutdown();
     }
     
   }
-  
+
+  @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+  @Test (timeout = 180000)
+  public void testConcurrentStatistics()
+      throws IOException, InterruptedException {
+    FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
+        DistributedFileSystem.class).reset();
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(
+        new Configuration()).build();
+    cluster.waitActive();
+    final FileSystem fs = cluster.getFileSystem();
+    final int numThreads = 5;
+    final ExecutorService threadPool =
+        HadoopExecutors.newFixedThreadPool(numThreads);
+
+    try {
+      final CountDownLatch allExecutorThreadsReady =
+          new CountDownLatch(numThreads);
+      final CountDownLatch startBlocker = new CountDownLatch(1);
+      final CountDownLatch allDone = new CountDownLatch(numThreads);
+      final AtomicReference<Throwable> childError = new AtomicReference<>();
+
+      for (int i = 0; i < numThreads; i++) {
+        threadPool.submit(new Runnable() {
+          @Override
+          public void run() {
+            allExecutorThreadsReady.countDown();
+            try {
+              startBlocker.await();
+              final FileSystem fs = cluster.getFileSystem();
+              fs.mkdirs(new Path("/testStatisticsParallelChild"));
+            } catch (Throwable t) {
+              LOG.error("Child failed when calling mkdir", t);
+              childError.compareAndSet(null, t);
+            } finally {
+              allDone.countDown();
+            }
+          }
+        });
+      }
+
+      final long oldMkdirOpCount = getOpStatistics(OpType.MKDIRS);
+
+      // wait until all threads are ready
+      allExecutorThreadsReady.await();
+      // all threads start making directories
+      startBlocker.countDown();
+      // wait until all threads are done
+      allDone.await();
+
+     assertNull("Child failed with exception " + childError.get(),
+          childError.get());
+
+      checkStatistics(fs, 0, numThreads, 0);
+      // check the single operation count stat
+      checkOpStatistics(OpType.MKDIRS, numThreads + oldMkdirOpCount);
+      // iterate all the operation counts
+      for (Iterator<LongStatistic> opCountIter =
+           FileSystem.getGlobalStorageStatistics()
+               .get(DFSOpsCountStatistics.NAME).getLongStatistics();
+           opCountIter.hasNext();) {
+        final LongStatistic opCount = opCountIter.next();
+        if (OpType.MKDIRS.getSymbol().equals(opCount.getName())) {
+          assertEquals("Unexpected op count from iterator!",
+              numThreads + oldMkdirOpCount, opCount.getValue());
+        }
+        LOG.info(opCount.getName() + "\t" + opCount.getValue());
+      }
+    } finally {
+      threadPool.shutdownNow();
+      cluster.shutdown();
+    }
+  }
+
   /** Checks statistics. -1 indicates do not check for the operations */
   private void checkStatistics(FileSystem fs, int readOps, int writeOps,
       int largeReadOps) {
@@ -713,6 +850,17 @@ public class TestDistributedFileSystem {
     }
   }
 
+  private static void checkOpStatistics(OpType op, long count) {
+    assertEquals("Op " + op.getSymbol() + " has unexpected count!",
+        count, getOpStatistics(op));
+  }
+
+  private static long getOpStatistics(OpType op) {
+    return GlobalStorageStatistics.INSTANCE.get(
+        DFSOpsCountStatistics.NAME)
+        .getLong(op.getSymbol());
+  }
+
   @Test
   public void testFileChecksum() throws Exception {
     final long seed = RAN.nextLong();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to