HBASE-18135 Implement mechanism for RegionServers to report file archival for 
space quotas

This de-couples the snapshot size calculation from the
SpaceQuotaObserverChore into another API which both the periodically
invoked Master chore and the Master service endpoint can invoke. This
allows for multiple sources of snapshot size to reported (from the
multiple sources we have in HBase).

When a file is archived, snapshot sizes can be more quickly realized and
the Master can still perform periodical computations of the total
snapshot size to account for any delayed/missing/lost file archival RPCs.

Signed-off-by: Ted Yu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4a4c0120
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4a4c0120
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4a4c0120

Branch: refs/heads/HBASE-19064
Commit: 4a4c0120494757539d680c2d7d44fe6ab3d71d27
Parents: 2402f1f
Author: Josh Elser <els...@apache.org>
Authored: Tue Feb 27 19:36:45 2018 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Mon Mar 5 17:32:42 2018 -0500

----------------------------------------------------------------------
 .../hadoop/hbase/quotas/QuotaTableUtil.java     |   8 +-
 .../src/main/protobuf/RegionServerStatus.proto  |  16 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   4 +
 .../hadoop/hbase/master/MasterRpcServices.java  |  18 +
 .../hbase/quotas/FileArchiverNotifier.java      |  53 ++
 .../quotas/FileArchiverNotifierFactory.java     |  35 +
 .../quotas/FileArchiverNotifierFactoryImpl.java | 114 ++++
 .../hbase/quotas/FileArchiverNotifierImpl.java  | 634 +++++++++++++++++++
 .../hadoop/hbase/quotas/MasterQuotaManager.java |  31 +
 .../quotas/RegionServerSpaceQuotaManager.java   |  34 +
 .../quotas/SnapshotQuotaObserverChore.java      | 350 +---------
 .../hbase/regionserver/HRegionServer.java       |  35 +
 .../hadoop/hbase/regionserver/HStore.java       |  71 ++-
 .../regionserver/RegionServerServices.java      |  14 +
 .../hadoop/hbase/MockRegionServerServices.java  |   8 +
 .../hadoop/hbase/master/MockRegionServer.java   |   8 +
 .../quotas/TestFileArchiverNotifierImpl.java    | 312 +++++++++
 .../hbase/quotas/TestLowLatencySpaceQuotas.java |  81 ++-
 .../quotas/TestSnapshotQuotaObserverChore.java  | 225 ++++---
 19 files changed, 1624 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
index 5c28407..7a0fdb0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
@@ -74,13 +74,15 @@ import org.apache.hadoop.hbase.util.Bytes;
 /**
  * Helper class to interact with the quota table.
  * <table>
- *   <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th></tr>
+ *   <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th><th>DESC</th></tr>
  *   
<tr><td>n.&lt;namespace&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
  *   <tr><td>n.&lt;namespace&gt;</td><td>u:p</td><td>&lt;namespace-quota 
policy&gt;</td></tr>
- *   
<tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td></tr>
+ *   
<tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td>
+ *      <td>The size of all snapshots against tables in the namespace</td></tr>
  *   
<tr><td>t.&lt;table&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
  *   <tr><td>t.&lt;table&gt;</td><td>u:p</td><td>&lt;table-quota 
policy&gt;</td></tr>
- *   <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot 
name&gt;</td><td>&lt;SpaceQuotaSnapshot&gt;</td></tr>
+ *   <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot name&gt;</td>
+ *      <td>&lt;SpaceQuotaSnapshot&gt;</td><td>The size of a snapshot against 
a table</td></tr>
  *   <tr><td>u.&lt;user&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
  *   
<tr><td>u.&lt;user&gt;</td><td>q:s.&lt;table&gt;</td><td>&lt;table-quotas&gt;</td></tr>
  *   
<tr><td>u.&lt;user&gt;</td><td>q:s.&lt;ns&gt;</td><td>&lt;namespace-quotas&gt;</td></tr>

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto 
b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index 3f836cd..002432a 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -162,6 +162,18 @@ message ReportProcedureDoneRequest {
 message ReportProcedureDoneResponse {
 }
 
+message FileArchiveNotificationRequest {
+  message FileWithSize {
+    optional TableName table_name = 1;
+    optional string name = 2;
+    optional uint64 size = 3;
+  }
+  repeated FileWithSize archived_files = 1;
+}
+
+message FileArchiveNotificationResponse {
+}
+
 service RegionServerStatusService {
   /** Called when a region server first starts. */
   rpc RegionServerStartup(RegionServerStartupRequest)
@@ -200,4 +212,8 @@ service RegionServerStatusService {
 
   rpc ReportProcedureDone(ReportProcedureDoneRequest)
     returns(ReportProcedureDoneResponse);
+
+  /** Reports files that were moved to the archive directory for space quotas 
*/
+  rpc ReportFileArchival(FileArchiveNotificationRequest)
+    returns(FileArchiveNotificationResponse);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c33f555..1ae85fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3603,4 +3603,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
       }
     }
   }
+
+  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
+    return this.snapshotQuotaChore;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 8f92041..def4640 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -258,6 +258,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -2247,4 +2249,20 @@ public class MasterRpcServices extends RSRpcServices
     });
     return ReportProcedureDoneResponse.getDefaultInstance();
   }
+
+  @Override
+  public FileArchiveNotificationResponse reportFileArchival(RpcController 
controller,
+      FileArchiveNotificationRequest request) throws ServiceException {
+    try {
+      master.checkInitialized();
+      if (!QuotaUtil.isQuotaEnabled(master.getConfiguration())) {
+        return FileArchiveNotificationResponse.newBuilder().build();
+      }
+      master.getMasterQuotaManager().processFileArchivals(request, 
master.getConnection(),
+          master.getConfiguration(), master.getFileSystem());
+      return FileArchiveNotificationResponse.newBuilder().build();
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java
new file mode 100644
index 0000000..7f1e47b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Interface allowing various implementations of tracking files that have 
recently been archived to
+ * allow for the Master to notice changes to snapshot sizes for space quotas.
+ *
+ * This object needs to ensure that {@link #addArchivedFiles(Set)} and
+ * {@link #computeAndStoreSnapshotSizes(Collection)} are mutually exclusive. 
If a "full" computation
+ * is in progress, new changes being archived should be held.
+ */
+@InterfaceAudience.Private
+public interface FileArchiverNotifier {
+
+  /**
+   * Records a file and its size in bytes being moved to the archive directory.
+   *
+   * @param fileSizes A collection of file name to size in bytes
+   * @throws IOException If there was an IO-related error persisting the file 
size(s)
+   */
+  void addArchivedFiles(Set<Entry<String, Long>> fileSizes) throws IOException;
+
+  /**
+   * Computes the size of a table and all of its snapshots, recording new 
"full" sizes for each.
+   *
+   * @param currentSnapshots the current list of snapshots against this table
+   * @return The total size of all snapshots against this table.
+   * @throws IOException If there was an IO-related error computing or 
persisting the sizes.
+   */
+  long computeAndStoreSnapshotSizes(Collection<String> currentSnapshots) 
throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java
new file mode 100644
index 0000000..98f188f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory class to create {@link FileArchiverNotifier} instances.
+ */
+@InterfaceAudience.Private
+public interface FileArchiverNotifierFactory {
+
+  /**
+   * Creates or obtains a {@link FileArchiverNotifier} instance for the given 
args.
+   */
+  FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs, 
TableName tn);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
new file mode 100644
index 0000000..3d21518
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A factory for getting instances of {@link FileArchiverNotifier}.
+ */
+@InterfaceAudience.Private
+public final class FileArchiverNotifierFactoryImpl implements 
FileArchiverNotifierFactory {
+  private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
+      new FileArchiverNotifierFactoryImpl();
+  private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = 
DEFAULT_INSTANCE;
+  private final ConcurrentHashMap<TableName,FileArchiverNotifier> CACHE;
+
+  private FileArchiverNotifierFactoryImpl() {
+    CACHE = new ConcurrentHashMap<>();
+  }
+
+  public static FileArchiverNotifierFactory getInstance() {
+    return CURRENT_INSTANCE;
+  }
+
+  @VisibleForTesting
+  static void setInstance(FileArchiverNotifierFactory inst) {
+    CURRENT_INSTANCE = Objects.requireNonNull(inst);
+  }
+
+  @VisibleForTesting
+  static void reset() {
+    CURRENT_INSTANCE = DEFAULT_INSTANCE;
+  }
+
+  /**
+   * Returns the {@link FileArchiverNotifier} instance for the given {@link 
TableName}.
+   *
+   * @param tn The table to obtain a notifier for
+   * @return The notifier for the given {@code tablename}.
+   */
+  public FileArchiverNotifier get(
+      Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+    // Ensure that only one instance is exposed to callers
+    final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, 
conf, fs, tn);
+    final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, 
newMapping);
+    if (previousMapping == null) {
+      return newMapping;
+    }
+    return previousMapping;
+  }
+
+  public int getCacheSize() {
+    return CACHE.size();
+  }
+
+  static class CacheKey {
+    final Connection conn;
+    final Configuration conf;
+    final FileSystem fs;
+    final TableName tn;
+
+    CacheKey(Connection conn, Configuration conf, FileSystem fs, TableName tn) 
{
+      this.conn = conn;
+      this.conf = conf;
+      this.fs = fs;
+      this.tn = tn;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof CacheKey)) {
+        return false;
+      }
+      CacheKey other = (CacheKey) o;
+      // TableName should be the only thing differing..
+      return tn.equals(other.tn) && conn.equals(other.conn) && 
conf.equals(other.conf)
+          && fs.equals(other.fs);
+    }
+
+    @Override
+    public int hashCode() {
+      return new 
HashCodeBuilder().append(conn).append(conf).append(fs).append(tn).toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "CacheKey[TableName=" + tn + "]";
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java
new file mode 100644
index 0000000..8cde9c1
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
+
+/**
+ * Tracks file archiving and updates the hbase quota table.
+ */
+@InterfaceAudience.Private
+public class FileArchiverNotifierImpl implements FileArchiverNotifier {
+  private static final Log LOG = 
LogFactory.getLog(FileArchiverNotifierImpl.class);
+  private final Connection conn;
+  private final Configuration conf;
+  private final FileSystem fs;
+  private final TableName tn;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private volatile long lastFullCompute = Long.MIN_VALUE;
+  private List<String> currentSnapshots = Collections.emptyList();
+  private static final Map<String,Object> NAMESPACE_LOCKS = new HashMap<>();
+
+  /**
+   * An Exception thrown when SnapshotSize updates to hbase:quota fail to be 
written.
+   */
+  @InterfaceAudience.Private
+  public static class QuotaSnapshotSizeSerializationException extends 
IOException {
+    private static final long serialVersionUID = 1L;
+
+    public QuotaSnapshotSizeSerializationException(String msg) {
+      super(msg);
+    }
+  }
+
+  public FileArchiverNotifierImpl(
+      Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+    this.conn = conn;
+    this.conf = conf;
+    this.fs = fs;
+    this.tn = tn;
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  static synchronized Object getLockForNamespace(String namespace) {
+    return NAMESPACE_LOCKS.computeIfAbsent(namespace, (ns) -> new Object());
+  }
+
+  /**
+   * Returns a strictly-increasing measure of time extracted by {@link 
System#nanoTime()}.
+   */
+  long getLastFullCompute() {
+    return lastFullCompute;
+  }
+
+  @Override
+  public void addArchivedFiles(Set<Entry<String, Long>> fileSizes) throws 
IOException {
+    long start = System.nanoTime();
+    readLock.lock();
+    try {
+      // We want to catch the case where we got an archival request, but there 
was a full
+      // re-computation in progress that was blocking us. Most likely, the 
full computation is going
+      // to already include the changes we were going to make.
+      //
+      // Same as "start < lastFullCompute" but avoiding numeric overflow per 
the
+      // System.nanoTime() javadoc
+      if (lastFullCompute != Long.MIN_VALUE && start - lastFullCompute < 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("A full computation was performed after this request was 
received."
+              + " Ignoring requested updates: " + fileSizes);
+        }
+        return;
+      }
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("currentSnapshots: " + currentSnapshots + " fileSize: "+ 
fileSizes);
+      }
+
+      // Write increment to quota table for the correct snapshot. Only do this 
if we have snapshots
+      // and some files that were archived.
+      if (!currentSnapshots.isEmpty() && !fileSizes.isEmpty()) {
+        // We get back the files which no snapshot referenced (the files which 
will be deleted soon)
+        groupArchivedFiledBySnapshotAndRecordSize(currentSnapshots, fileSizes);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * For each file in the map, this updates the first snapshot (lexicographic 
snapshot name) that
+   * references this file. The result of this computation is serialized to the 
quota table.
+   *
+   * @param snapshots A collection of HBase snapshots to group the files into
+   * @param fileSizes A map of file names to their sizes
+   */
+  void groupArchivedFiledBySnapshotAndRecordSize(
+      List<String> snapshots, Set<Entry<String, Long>> fileSizes) throws 
IOException {
+    // Make a copy as we'll modify it.
+    final Map<String,Long> filesToUpdate = new HashMap<>(fileSizes.size());
+    for (Entry<String,Long> entry : fileSizes) {
+      filesToUpdate.put(entry.getKey(), entry.getValue());
+    }
+    // Track the change in size to each snapshot
+    final Map<String,Long> snapshotSizeChanges = new HashMap<>();
+    for (String snapshot : snapshots) {
+      // For each file in `filesToUpdate`, check if `snapshot` refers to it.
+      // If `snapshot` does, remove it from `filesToUpdate` and add it to 
`snapshotSizeChanges`.
+      bucketFilesToSnapshot(snapshot, filesToUpdate, snapshotSizeChanges);
+      if (filesToUpdate.isEmpty()) {
+        // If we have no more files recently archived, we have nothing more to 
check
+        break;
+      }
+    }
+    // We have computed changes to the snapshot size, we need to record them.
+    if (!snapshotSizeChanges.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Writing snapshot size changes for: " + snapshotSizeChanges);
+      }
+      persistSnapshotSizeChanges(snapshotSizeChanges);
+    }
+  }
+
+  /**
+   * For the given snapshot, find all files which this {@code snapshotName} 
references. After a file
+   * is found to be referenced by the snapshot, it is removed from {@code 
filesToUpdate} and
+   * {@code snapshotSizeChanges} is updated in concert.
+   *
+   * @param snapshotName The snapshot to check
+   * @param filesToUpdate A mapping of archived files to their size
+   * @param snapshotSizeChanges A mapping of snapshots and their change in size
+   */
+  void bucketFilesToSnapshot(
+      String snapshotName, Map<String,Long> filesToUpdate, Map<String,Long> 
snapshotSizeChanges)
+          throws IOException {
+    // A quick check to avoid doing work if the caller unnecessarily invoked 
this method.
+    if (filesToUpdate.isEmpty()) {
+      return;
+    }
+
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
+        snapshotName, FSUtils.getRootDir(conf));
+    SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, 
snapshotDir);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, 
sd);
+    // For each region referenced by the snapshot
+    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
+      // For each column family in this region
+      for (FamilyFiles ff : rm.getFamilyFilesList()) {
+        // And each store file in that family
+        for (StoreFile sf : ff.getStoreFilesList()) {
+          Long valueOrNull = filesToUpdate.remove(sf.getName());
+          if (valueOrNull != null) {
+            // This storefile was recently archived, we should update this 
snapshot with its size
+            snapshotSizeChanges.merge(snapshotName, valueOrNull, Long::sum);
+          }
+          // Short-circuit, if we have no more files that were archived, we 
don't need to iterate
+          // over the rest of the snapshot.
+          if (filesToUpdate.isEmpty()) {
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Reads the current size for each snapshot to update, generates a new 
update based on that value,
+   * and then writes the new update.
+   *
+   * @param snapshotSizeChanges A map of snapshot name to size change
+   */
+  void persistSnapshotSizeChanges(Map<String,Long> snapshotSizeChanges) throws 
IOException {
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      // Create a list (with a more typical ordering implied)
+      final List<Entry<String,Long>> snapshotSizeEntries = new ArrayList<>(
+          snapshotSizeChanges.entrySet());
+      // Create the Gets for each snapshot we need to update
+      final List<Get> snapshotSizeGets = snapshotSizeEntries.stream()
+          .map((e) -> QuotaTableUtil.makeGetForSnapshotSize(tn, e.getKey()))
+          .collect(Collectors.toList());
+      final Iterator<Entry<String,Long>> iterator = 
snapshotSizeEntries.iterator();
+      // A List to store each Put we'll create from the Get's we retrieve
+      final List<Put> updates = new ArrayList<>(snapshotSizeEntries.size());
+
+      // TODO Push this down to the RegionServer with a coprocessor:
+      //
+      // We would really like to piggy-back on the row-lock already being 
grabbed
+      // to handle the update of the row in the quota table. However, because 
the value
+      // is a serialized protobuf, the standard Increment API doesn't work for 
us. With a CP, we
+      // can just send the size deltas to the RS and atomically update the 
serialized PB object
+      // while relying on the row-lock for synchronization.
+      //
+      // Synchronizing on the namespace string is a "minor smell" but passable 
as this is
+      // only invoked via a single caller (the active Master). Using the 
namespace name lets us
+      // have some parallelism without worry of on caller seeing stale data 
from the quota table.
+      synchronized (getLockForNamespace(tn.getNamespaceAsString())) {
+        final Result[] existingSnapshotSizes = 
quotaTable.get(snapshotSizeGets);
+        long totalSizeChange = 0;
+        // Read the current size values (if they exist) to generate the new 
value
+        for (Result result : existingSnapshotSizes) {
+          Entry<String,Long> entry = iterator.next();
+          String snapshot = entry.getKey();
+          Long size = entry.getValue();
+          // Track the total size change for the namespace this table belongs 
in
+          totalSizeChange += size;
+          // Get the size of the previous value (or zero)
+          long previousSize = getSnapshotSizeFromResult(result);
+          // Create an update. A file was archived from the table, so the 
table's size goes
+          // down, but the snapshot's size goes up.
+          updates.add(QuotaTableUtil.createPutForSnapshotSize(tn, snapshot, 
previousSize + size));
+        }
+
+        // Create an update for the summation of all snapshots in the namespace
+        if (totalSizeChange != 0) {
+          long previousSize = getPreviousNamespaceSnapshotSize(
+              quotaTable, tn.getNamespaceAsString());
+          updates.add(QuotaTableUtil.createPutForNamespaceSnapshotSize(
+              tn.getNamespaceAsString(), previousSize + totalSizeChange));
+        }
+
+        // Send all of the quota table updates in one batch.
+        List<Object> failures = new ArrayList<>();
+        final Object[] results = new Object[updates.size()];
+        quotaTable.batch(updates, results);
+        for (Object result : results) {
+          // A null result is an error condition (all RPC attempts failed)
+          if (!(result instanceof Result)) {
+            failures.add(result);
+          }
+        }
+        // Propagate a failure if any updates failed
+        if (!failures.isEmpty()) {
+          throw new QuotaSnapshotSizeSerializationException(
+              "Failed to write some snapshot size updates: " + failures);
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return;
+    }
+  }
+
+  /**
+   * Fetches the current size of all snapshots in the given {@code namespace}.
+   *
+   * @param quotaTable The HBase quota table
+   * @param namespace Namespace to fetch the sum of snapshot sizes for
+   * @return The size of all snapshot sizes for the namespace in bytes.
+   */
+  long getPreviousNamespaceSnapshotSize(Table quotaTable, String namespace) 
throws IOException {
+    // Update the size of each snapshot for all snapshots in a namespace.
+    Result r = quotaTable.get(
+        QuotaTableUtil.createGetNamespaceSnapshotSize(namespace));
+    return getSnapshotSizeFromResult(r);
+  }
+
+  /**
+   * Extracts the size component from a serialized {@link SpaceQuotaSnapshot} 
protobuf.
+   *
+   * @param r A Result containing one cell with a SpaceQuotaSnapshot protobuf
+   * @return The size in bytes of the snapshot.
+   */
+  long getSnapshotSizeFromResult(Result r) throws 
InvalidProtocolBufferException {
+    // Per javadoc, Result should only be null if an exception was thrown. So, 
if we're here,
+    // we should be non-null. If we can't advance to the first cell, same as 
"no cell".
+    if (!r.isEmpty() && r.advance()) {
+      return QuotaTableUtil.parseSnapshotSize(r.current());
+    }
+    return 0L;
+  }
+
+  @Override
+  public long computeAndStoreSnapshotSizes(
+      Collection<String> currentSnapshots) throws IOException {
+    // Record what the current snapshots are
+    this.currentSnapshots = new ArrayList<>(currentSnapshots);
+    Collections.sort(this.currentSnapshots);
+
+    // compute new size for table + snapshots for that table
+    List<SnapshotWithSize> snapshotSizes = 
computeSnapshotSizes(this.currentSnapshots);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Computed snapshot sizes for " + tn + " of " + snapshotSizes);
+    }
+
+    // Compute the total size of all snapshots against our table
+    final long totalSnapshotSize = snapshotSizes.stream().mapToLong((sws) -> 
sws.getSize()).sum();
+
+    writeLock.lock();
+    try {
+      // Persist the size of each snapshot
+      try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+        persistSnapshotSizes(quotaTable, snapshotSizes);
+      }
+
+      // Report the last time we did a recomputation
+      lastFullCompute = System.nanoTime();
+
+      return totalSnapshotSize;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getSimpleName()).append("[");
+    sb.append("tableName=").append(tn).append(", currentSnapshots=");
+    sb.append(currentSnapshots).append(", 
lastFullCompute=").append(lastFullCompute);
+    return sb.append("]").toString();
+  }
+
+  /**
+   * Computes the size of each snapshot against the table referenced by {@code 
this}.
+   *
+   * @param snapshots A sorted list of snapshots against {@code tn}.
+   * @return A list of the size for each snapshot against {@code tn}.
+   */
+  List<SnapshotWithSize> computeSnapshotSizes(List<String> snapshots) throws 
IOException {
+    final List<SnapshotWithSize> snapshotSizes = new 
ArrayList<>(snapshots.size());
+    final Path rootDir = FSUtils.getRootDir(conf);
+
+    // Get the map of store file names to store file path for this table
+    final Set<String> tableReferencedStoreFiles;
+    try {
+      tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, 
rootDir).keySet();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
+    }
+
+    // For each snapshot on this table, get the files which the snapshot 
references which
+    // the table does not.
+    Set<String> snapshotReferencedFiles = new HashSet<>();
+    for (String snapshotName : snapshots) {
+      Path snapshotDir = 
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+      SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, 
snapshotDir);
+      SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, 
sd);
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Files referenced by other snapshots: " + 
snapshotReferencedFiles);
+      }
+
+      // Get the set of files from the manifest that this snapshot references 
which are not also
+      // referenced by the originating table.
+      Set<StoreFileReference> unreferencedStoreFileNames = 
getStoreFilesFromSnapshot(
+          manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
+              && !snapshotReferencedFiles.contains(sfn));
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Snapshot " + snapshotName + " solely references the files: "
+            + unreferencedStoreFileNames);
+      }
+
+      // Compute the size of the store files for this snapshot
+      long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Computed size of " + snapshotName + " to be " + size);
+      }
+
+      // Persist this snapshot's size into the map
+      snapshotSizes.add(new SnapshotWithSize(snapshotName, size));
+
+      // Make sure that we don't double-count the same file
+      for (StoreFileReference ref : unreferencedStoreFileNames) {
+        for (String fileNames : ref.getFamilyToFilesMapping().values()) {
+          snapshotReferencedFiles.add(fileNames);
+        }
+      }
+    }
+
+    return snapshotSizes;
+  }
+
+  /**
+   * Computes the size of each store file in {@code storeFileNames}
+   */
+  long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> 
storeFileNames) {
+    return storeFileNames.stream()
+        .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
+  }
+
+  /**
+   * Computes the size of the store files for a single region.
+   */
+  long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
+    String regionName = storeFileName.getRegionName();
+    return storeFileName.getFamilyToFilesMapping()
+        .entries().stream()
+        .collect(Collectors.summingLong((e) ->
+            getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
+  }
+
+  /**
+   * Computes the size of the store file given its name, region and family 
name in
+   * the archive directory.
+   */
+  long getSizeOfStoreFile(
+      TableName tn, String regionName, String family, String storeFile) {
+    Path familyArchivePath;
+    try {
+      familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, 
regionName, family);
+    } catch (IOException e) {
+      LOG.warn("Could not compute path for the archive directory for the 
region", e);
+      return 0L;
+    }
+    Path fileArchivePath = new Path(familyArchivePath, storeFile);
+    try {
+      if (fs.exists(fileArchivePath)) {
+        FileStatus[] status = fs.listStatus(fileArchivePath);
+        if (1 != status.length) {
+          LOG.warn("Expected " + fileArchivePath +
+              " to be a file but was a directory, ignoring reference");
+          return 0L;
+        }
+        return status[0].getLen();
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not obtain the status of " + fileArchivePath, e);
+      return 0L;
+    }
+    LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring 
reference.");
+    return 0L;
+  }
+
+  /**
+   * Extracts the names of the store files referenced by this snapshot which 
satisfy the given
+   * predicate (the predicate returns {@code true}).
+   */
+  Set<StoreFileReference> getStoreFilesFromSnapshot(
+      SnapshotManifest manifest, Predicate<String> filter) {
+    Set<StoreFileReference> references = new HashSet<>();
+    // For each region referenced by the snapshot
+    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
+      StoreFileReference regionReference = new StoreFileReference(
+          ProtobufUtil.toRegionInfo(rm.getRegionInfo()).getEncodedName());
+
+      // For each column family in this region
+      for (FamilyFiles ff : rm.getFamilyFilesList()) {
+        final String familyName = ff.getFamilyName().toStringUtf8();
+        // And each store file in that family
+        for (StoreFile sf : ff.getStoreFilesList()) {
+          String storeFileName = sf.getName();
+          // A snapshot only "inherits" a files size if it uniquely refers to 
it (no table
+          // and no other snapshot references it).
+          if (filter.test(storeFileName)) {
+            regionReference.addFamilyStoreFile(familyName, storeFileName);
+          }
+        }
+      }
+      // Only add this Region reference if we retained any files.
+      if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
+        references.add(regionReference);
+      }
+    }
+    return references;
+  }
+
+  /**
+   * Writes the snapshot sizes to the provided {@code table}.
+   */
+  void persistSnapshotSizes(
+      Table table, List<SnapshotWithSize> snapshotSizes) throws IOException {
+    // Convert each entry in the map to a Put and write them to the quota table
+    table.put(snapshotSizes
+        .stream()
+        .map(sws -> QuotaTableUtil.createPutForSnapshotSize(
+            tn, sws.getName(), sws.getSize()))
+        .collect(Collectors.toList()));
+  }
+
+  /**
+   * A struct encapsulating the name of a snapshot and its "size" on the 
filesystem. This size is
+   * defined as the amount of filesystem space taken by the files the snapshot 
refers to which
+   * the originating table no longer refers to.
+   */
+  static class SnapshotWithSize {
+    private final String name;
+    private final long size;
+
+    SnapshotWithSize(String name, long size) {
+      this.name = Objects.requireNonNull(name);
+      this.size = size;
+    }
+
+    String getName() {
+      return name;
+    }
+
+    long getSize() {
+      return size;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().append(name).append(size).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (!(o instanceof SnapshotWithSize)) {
+        return false;
+      }
+
+      SnapshotWithSize other = (SnapshotWithSize) o;
+      return name.equals(other.name) && size == other.size;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder(32);
+      return sb.append("SnapshotWithSize:[").append(name).append(" ")
+          .append(StringUtils.byteDesc(size)).append("]").toString();
+    }
+  }
+
+  /**
+   * A reference to a collection of files in the archive directory for a 
single region.
+   */
+  static class StoreFileReference {
+    private final String regionName;
+    private final Multimap<String,String> familyToFiles;
+
+    StoreFileReference(String regionName) {
+      this.regionName = Objects.requireNonNull(regionName);
+      familyToFiles = HashMultimap.create();
+    }
+
+    String getRegionName() {
+      return regionName;
+    }
+
+    Multimap<String,String> getFamilyToFilesMapping() {
+      return familyToFiles;
+    }
+
+    void addFamilyStoreFile(String family, String storeFileName) {
+      familyToFiles.put(family, storeFileName);
+    }
+
+    @Override
+    public int hashCode() {
+      return new 
HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof StoreFileReference)) {
+        return false;
+      }
+      StoreFileReference other = (StoreFileReference) o;
+      return regionName.equals(other.regionName) && 
familyToFiles.equals(other.familyToFiles);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      return 
sb.append("StoreFileReference[region=").append(regionName).append(", files=")
+          .append(familyToFiles).append("]").toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 6783e7d..bdeab80 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -25,14 +25,18 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.RegionStateListener;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
@@ -41,11 +45,17 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
 
 /**
  * Master Quota Manager.
@@ -566,5 +576,26 @@ public class MasterQuotaManager implements 
RegionStateListener {
     }
     return numEntriesRemoved;
   }
+
+  public void processFileArchivals(FileArchiveNotificationRequest request, 
Connection conn,
+      Configuration conf, FileSystem fs) throws IOException {
+    final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = 
HashMultimap.create();
+    // Group the archived files by table
+    for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
+      TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
+      archivedFilesByTable.put(
+          tn, Maps.immutableEntry(fileWithSize.getName(), 
fileWithSize.getSize()));
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
+    }
+    // Report each set of files to the appropriate object
+    for (TableName tn : archivedFilesByTable.keySet()) {
+      final Set<Entry<String,Long>> filesWithSize = 
archivedFilesByTable.get(tn);
+      final FileArchiverNotifier notifier = 
FileArchiverNotifierFactoryImpl.getInstance().get(
+          conn, conf, fs, tn);
+      notifier.addArchivedFiles(filesWithSize);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
index bbc6df8..3972700 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.quotas;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -33,6 +34,11 @@ import 
org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 
 /**
  * A manager for filesystem space quotas in the RegionServer.
@@ -234,6 +240,34 @@ public class RegionServerSpaceQuotaManager {
   }
 
   /**
+   * Builds the protobuf message to inform the Master of files being archived.
+   *
+   * @param tn The table the files previously belonged to.
+   * @param archivedFiles The files and their size in bytes that were archived.
+   * @return The protobuf representation
+   */
+  public RegionServerStatusProtos.FileArchiveNotificationRequest 
buildFileArchiveRequest(
+      TableName tn, Collection<Entry<String,Long>> archivedFiles) {
+    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
+        RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
+    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
+    for (Entry<String,Long> archivedFile : archivedFiles) {
+      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws 
=
+          
RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
+              .setName(archivedFile.getKey())
+              .setSize(archivedFile.getValue())
+              .setTableName(protoTn)
+              .build();
+      builder.addArchivedFiles(fws);
+    }
+    final RegionServerStatusProtos.FileArchiveNotificationRequest request = 
builder.build();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reporting file archival to Master: " + 
TextFormat.shortDebugString(request));
+    }
+    return request;
+  }
+
+  /**
    * Returns the collection of tables which have quota violation policies 
enforced on
    * this RegionServer.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
index d90d1b3..9dd2ac0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
@@ -17,26 +17,18 @@
 package org.apache.hadoop.hbase.quotas;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
@@ -48,15 +40,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MetricsMaster;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
@@ -128,11 +111,11 @@ public class SnapshotQuotaObserverChore extends 
ScheduledChore {
     }
 
     // For each table, compute the size of each snapshot
-    Multimap<TableName,SnapshotWithSize> snapshotsWithSize = 
computeSnapshotSizes(
-        snapshotsToComputeSize);
+    Map<String,Long> namespaceSnapshotSizes = 
computeSnapshotSizes(snapshotsToComputeSize);
 
-    // Write the size data to the quota table.
-    persistSnapshotSizes(snapshotsWithSize);
+    // Write the size data by namespaces to the quota table.
+    // We need to do this "globally" since each FileArchiverNotifier is 
limited to its own Table.
+    persistSnapshotSizesForNamespaces(namespaceSnapshotSizes);
   }
 
   /**
@@ -188,321 +171,50 @@ public class SnapshotQuotaObserverChore extends 
ScheduledChore {
    * @param snapshotsToComputeSize The snapshots to compute the size of
    * @return A mapping of table to snapshot created from that table and the 
snapshot's size.
    */
-  Multimap<TableName,SnapshotWithSize> computeSnapshotSizes(
+  Map<String,Long> computeSnapshotSizes(
       Multimap<TableName,String> snapshotsToComputeSize) throws IOException {
-    Multimap<TableName,SnapshotWithSize> snapshotSizes = HashMultimap.create();
+    final Map<String,Long> snapshotSizesByNamespace = new HashMap<>();
+    final long start = System.nanoTime();
     for (Entry<TableName,Collection<String>> entry : 
snapshotsToComputeSize.asMap().entrySet()) {
       final TableName tn = entry.getKey();
-      final List<String> snapshotNames = new ArrayList<>(entry.getValue());
-      // Sort the snapshots so we process them in lexicographic order. This 
ensures that multiple
-      // invocations of this Chore do not more the size ownership of some 
files between snapshots
-      // that reference the file (prevents size ownership from moving between 
snapshots).
-      Collections.sort(snapshotNames);
-      final Path rootDir = FSUtils.getRootDir(conf);
-      // Get the map of store file names to store file path for this table
-      // TODO is the store-file name unique enough? Does this need to be 
region+family+storefile?
-      final Set<String> tableReferencedStoreFiles;
-      try {
-        tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, 
rootDir).keySet();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      }
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
-      }
-
-      // For each snapshot on this table, get the files which the snapshot 
references which
-      // the table does not.
-      Set<String> snapshotReferencedFiles = new HashSet<>();
-      for (String snapshotName : snapshotNames) {
-        final long start = System.nanoTime();
-        Path snapshotDir = 
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-        SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, 
snapshotDir);
-        SnapshotManifest manifest = SnapshotManifest.open(conf, fs, 
snapshotDir, sd);
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Files referenced by other snapshots: " + 
snapshotReferencedFiles);
-        }
-
-        // Get the set of files from the manifest that this snapshot 
references which are not also
-        // referenced by the originating table.
-        Set<StoreFileReference> unreferencedStoreFileNames = 
getStoreFilesFromSnapshot(
-            manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
-                && !snapshotReferencedFiles.contains(sfn));
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Snapshot " + snapshotName + " solely references the 
files: "
-              + unreferencedStoreFileNames);
-        }
-
-        // Compute the size of the store files for this snapshot
-        long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Computed size of " + snapshotName + " to be " + size);
-        }
+      final Collection<String> snapshotNames = entry.getValue();
 
-        // Persist this snapshot's size into the map
-        snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size));
+      // Get our notifier instance, this is tracking archivals that happen 
out-of-band of this chore
+      FileArchiverNotifier notifier = getNotifierForTable(tn);
 
-        // Make sure that we don't double-count the same file
-        for (StoreFileReference ref : unreferencedStoreFileNames) {
-          for (String fileName : ref.getFamilyToFilesMapping().values()) {
-            snapshotReferencedFiles.add(fileName);
-          }
-        }
-        // Update the amount of time it took to compute the snapshot's size
-        if (null != metrics) {
-          metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - 
start) / 1_000_000);
-        }
-      }
+      // The total size consumed by all snapshots against this table
+      long totalSnapshotSize = 
notifier.computeAndStoreSnapshotSizes(snapshotNames);
+      // Bucket that size into the appropriate namespace
+      snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), 
totalSnapshotSize, Long::sum);
     }
-    return snapshotSizes;
-  }
-
-  /**
-   * Extracts the names of the store files referenced by this snapshot which 
satisfy the given
-   * predicate (the predicate returns {@code true}).
-   */
-  Set<StoreFileReference> getStoreFilesFromSnapshot(
-      SnapshotManifest manifest, Predicate<String> filter) {
-    Set<StoreFileReference> references = new HashSet<>();
-    // For each region referenced by the snapshot
-    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
-      StoreFileReference regionReference = new StoreFileReference(
-          HRegionInfo.convert(rm.getRegionInfo()).getEncodedName());
 
-      // For each column family in this region
-      for (FamilyFiles ff : rm.getFamilyFilesList()) {
-        final String familyName = ff.getFamilyName().toStringUtf8();
-        // And each store file in that family
-        for (StoreFile sf : ff.getStoreFilesList()) {
-          String storeFileName = sf.getName();
-          // A snapshot only "inherits" a files size if it uniquely refers to 
it (no table
-          // and no other snapshot references it).
-          if (filter.test(storeFileName)) {
-            regionReference.addFamilyStoreFile(familyName, storeFileName);
-          }
-        }
-      }
-      // Only add this Region reference if we retained any files.
-      if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
-        references.add(regionReference);
-      }
+    // Update the amount of time it took to compute the size of the snapshots 
for a table
+    if (metrics != null) {
+      metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) 
/ 1_000_000);
     }
-    return references;
-  }
 
-  /**
-   * Calculates the directory in HDFS for a table based on the configuration.
-   */
-  Path getTableDir(TableName tn) throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-    return FSUtils.getTableDir(rootDir, tn);
-  }
-
-  /**
-   * Computes the size of each store file in {@code storeFileNames}
-   */
-  long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> 
storeFileNames) {
-    return storeFileNames.stream()
-        .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
-  }
-
-  /**
-   * Computes the size of the store files for a single region.
-   */
-  long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
-    String regionName = storeFileName.getRegionName();
-    return storeFileName.getFamilyToFilesMapping()
-        .entries().stream()
-        .collect(Collectors.summingLong((e) ->
-            getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
+    return snapshotSizesByNamespace;
   }
 
   /**
-   * Computes the size of the store file given its name, region and family 
name in
-   * the archive directory.
-   */
-  long getSizeOfStoreFile(
-      TableName tn, String regionName, String family, String storeFile) {
-    Path familyArchivePath;
-    try {
-      familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, 
regionName, family);
-    } catch (IOException e) {
-      LOG.warn("Could not compute path for the archive directory for the 
region", e);
-      return 0L;
-    }
-    Path fileArchivePath = new Path(familyArchivePath, storeFile);
-    try {
-      if (fs.exists(fileArchivePath)) {
-        FileStatus[] status = fs.listStatus(fileArchivePath);
-        if (1 != status.length) {
-          LOG.warn("Expected " + fileArchivePath +
-              " to be a file but was a directory, ignoring reference");
-          return 0L;
-        }
-        return status[0].getLen();
-      }
-    } catch (IOException e) {
-      LOG.warn("Could not obtain the status of " + fileArchivePath, e);
-      return 0L;
-    }
-    LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring 
reference.");
-    return 0L;
-  }
-
-  /**
-   * Writes the snapshot sizes to the {@code hbase:quota} table.
+   * Returns the correct instance of {@link FileArchiverNotifier} for the 
given table name.
    *
-   * @param snapshotsWithSize The snapshot sizes to write.
+   * @param tn The table name
+   * @return A {@link FileArchiverNotifier} instance
    */
-  void persistSnapshotSizes(
-      Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws 
IOException {
-    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
-      // Write each snapshot size for the table
-      persistSnapshotSizes(quotaTable, snapshotsWithSize);
-      // Write a size entry for all snapshots in a namespace
-      persistSnapshotSizesByNS(quotaTable, snapshotsWithSize);
-    }
+  FileArchiverNotifier getNotifierForTable(TableName tn) {
+    return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, 
tn);
   }
 
   /**
-   * Writes the snapshot sizes to the provided {@code table}.
+   * Writes the size used by snapshots for each namespace to the quota table.
    */
-  void persistSnapshotSizes(
-      Table table, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) 
throws IOException {
-    // Convert each entry in the map to a Put and write them to the quota table
-    table.put(snapshotsWithSize.entries()
-        .stream()
-        .map(e -> QuotaTableUtil.createPutForSnapshotSize(
-            e.getKey(), e.getValue().getName(), e.getValue().getSize()))
-        .collect(Collectors.toList()));
-  }
-
-  /**
-   * Rolls up the snapshot sizes by namespace and writes a single record for 
each namespace
-   * which is the size of all snapshots in that namespace.
-   */
-  void persistSnapshotSizesByNS(
-      Table quotaTable, Multimap<TableName,SnapshotWithSize> 
snapshotsWithSize) throws IOException {
-    Map<String,Long> namespaceSnapshotSizes = 
groupSnapshotSizesByNamespace(snapshotsWithSize);
-    quotaTable.put(namespaceSnapshotSizes.entrySet().stream()
-        .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(
-            e.getKey(), e.getValue()))
-        .collect(Collectors.toList()));
-  }
-
-  /**
-   * Sums the snapshot sizes for each namespace.
-   */
-  Map<String,Long> groupSnapshotSizesByNamespace(
-      Multimap<TableName,SnapshotWithSize> snapshotsWithSize) {
-    return snapshotsWithSize.entries().stream()
-        .collect(Collectors.groupingBy(
-            // Convert TableName into the namespace string
-            (e) -> e.getKey().getNamespaceAsString(),
-            // Sum the values for namespace
-            Collectors.mapping(
-                Map.Entry::getValue, Collectors.summingLong((sws) -> 
sws.getSize()))));
-  }
-
-  /**
-   * A struct encapsulating the name of a snapshot and its "size" on the 
filesystem. This size is
-   * defined as the amount of filesystem space taken by the files the snapshot 
refers to which
-   * the originating table no longer refers to.
-   */
-  static class SnapshotWithSize {
-    private final String name;
-    private final long size;
-
-    SnapshotWithSize(String name, long size) {
-      this.name = Objects.requireNonNull(name);
-      this.size = size;
-    }
-
-    String getName() {
-      return name;
-    }
-
-    long getSize() {
-      return size;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().append(name).append(size).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-
-      if (!(o instanceof SnapshotWithSize)) {
-        return false;
-      }
-
-      SnapshotWithSize other = (SnapshotWithSize) o;
-      return name.equals(other.name) && size == other.size;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder(32);
-      return sb.append("SnapshotWithSize:[").append(name).append(" ")
-          .append(StringUtils.byteDesc(size)).append("]").toString();
-    }
-  }
-
-  /**
-   * A reference to a collection of files in the archive directory for a 
single region.
-   */
-  static class StoreFileReference {
-    private final String regionName;
-    private final Multimap<String,String> familyToFiles;
-
-    StoreFileReference(String regionName) {
-      this.regionName = Objects.requireNonNull(regionName);
-      familyToFiles = HashMultimap.create();
-    }
-
-    String getRegionName() {
-      return regionName;
-    }
-
-    Multimap<String,String> getFamilyToFilesMapping() {
-      return familyToFiles;
-    }
-
-    void addFamilyStoreFile(String family, String storeFileName) {
-      familyToFiles.put(family, storeFileName);
-    }
-
-    @Override
-    public int hashCode() {
-      return new 
HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (!(o instanceof StoreFileReference)) {
-        return false;
-      }
-      StoreFileReference other = (StoreFileReference) o;
-      return regionName.equals(other.regionName) && 
familyToFiles.equals(other.familyToFiles);
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      return 
sb.append("StoreFileReference[region=").append(regionName).append(", files=")
-          .append(familyToFiles).append("]").toString();
+  void persistSnapshotSizesForNamespaces(
+      Map<String,Long> snapshotSizesByNamespace) throws IOException {
+    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.put(snapshotSizesByNamespace.entrySet().stream()
+          .map(e -> 
QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue()))
+          .collect(Collectors.toList()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7415b77..75ec46b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -202,6 +202,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -3688,6 +3689,40 @@ public class HRegionServer extends HasThread implements
     return this.rsSpaceQuotaManager;
   }
 
+  @Override
+  public boolean reportFileArchivalForQuotas(TableName tableName,
+      Collection<Entry<String,Long>> archivedFiles) {
+    RegionServerStatusService.BlockingInterface rss = rssStub;
+    if (rss == null) {
+      // the current server could be stopping.
+      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
+      return false;
+    }
+    try {
+      RegionServerStatusProtos.FileArchiveNotificationRequest request =
+          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, 
archivedFiles);
+      rss.reportFileArchival(null, request);
+    } catch (ServiceException se) {
+      IOException ioe = ProtobufUtil.getRemoteException(se);
+      if (ioe instanceof PleaseHoldException) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Failed to report file archival(s) to Master because it is 
initializing."
+              + " This will be retried.", ioe);
+        }
+        // The Master is coming up. Will retry the report later. Avoid 
re-creating the stub.
+        return false;
+      }
+      if (rssStub == rss) {
+        rssStub = null;
+      }
+      // re-create the stub if we failed to report the archival
+      createRegionServerStatusStub(true);
+      LOG.debug("Failed to report file archival(s) to Master. This will be 
retried.", ioe);
+      return false;
+    }
+    return true;
+  }
+
   public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
     return eventLoopGroupConfig;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 951f97c..78e2bdb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.OptionalDouble;
@@ -109,6 +110,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -2537,27 +2539,36 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
   /**
    * Archives and removes the compacted files
    * @param compactedfiles The compacted files in this store that are not 
active in reads
-   * @throws IOException
    */
   private void removeCompactedfiles(Collection<HStoreFile> compactedfiles)
       throws IOException {
     final List<HStoreFile> filesToRemove = new 
ArrayList<>(compactedfiles.size());
+    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
     for (final HStoreFile file : compactedfiles) {
       synchronized (file) {
         try {
           StoreFileReader r = file.getReader();
           if (r == null) {
             LOG.debug("The file {} was closed but still not archived", file);
+            // HACK: Temporarily re-open the reader so we can get the size of 
the file. Ideally,
+            // we should know the size of an HStoreFile without having to ask 
the HStoreFileReader
+            // for that.
+            long length = getStoreFileSize(file);
             filesToRemove.add(file);
+            storeFileSizes.add(length);
             continue;
           }
           if (file.isCompactedAway() && !file.isReferencedInReads()) {
             // Even if deleting fails we need not bother as any new scanners 
won't be
             // able to use the compacted file as the status is already 
compactedAway
             LOG.trace("Closing and archiving the file {}", file);
+            // Copy the file size before closing the reader
+            final long length = r.length();
             r.close(true);
             // Just close and return
             filesToRemove.add(file);
+            // Only add the length if we successfully added the file to 
`filesToRemove`
+            storeFileSizes.add(length);
           }
         } catch (Exception e) {
           LOG.error("Exception while trying to close the compacted store file 
{}",
@@ -2580,9 +2591,12 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
           // FileNotFoundException when we attempt to re-archive them in the 
next go around.
           Collection<Path> failedFiles = fae.getFailedFiles();
           Iterator<HStoreFile> iter = filesToRemove.iterator();
+          Iterator<Long> sizeIter = storeFileSizes.iterator();
           while (iter.hasNext()) {
+            sizeIter.next();
             if (failedFiles.contains(iter.next().getPath())) {
               iter.remove();
+              sizeIter.remove();
             }
           }
           if (!filesToRemove.isEmpty()) {
@@ -2595,7 +2609,34 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
     if (!filesToRemove.isEmpty()) {
       // Clear the compactedfiles from the store file manager
       clearCompactedfiles(filesToRemove);
+      // Try to send report of this archival to the Master for updating quota 
usage faster
+      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
+    }
+  }
+
+  /**
+   * Computes the length of a store file without succumbing to any errors 
along the way. If an
+   * error is encountered, the implementation returns {@code 0} instead of the 
actual size.
+   *
+   * @param file The file to compute the size of.
+   * @return The size in bytes of the provided {@code file}.
+   */
+  long getStoreFileSize(HStoreFile file) {
+    long length = 0;
+    try {
+      file.initReader();
+      length = file.getReader().length();
+    } catch (IOException e) {
+      LOG.trace("Failed to open reader when trying to compute store file size, 
ignoring", e);
+    } finally {
+      try {
+        file.closeStoreFile(
+            file.getCacheConf() != null ? 
file.getCacheConf().shouldEvictOnClose() : true);
+      } catch (IOException e) {
+        LOG.trace("Failed to close reader after computing store file size, 
ignoring", e);
+      }
     }
+    return length;
   }
 
   public Long preFlushSeqIDEstimation() {
@@ -2616,4 +2657,32 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
       lock.writeLock().unlock();
     }
   }
+
+  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, 
List<Long> fileSizes) {
+    // Sanity check from the caller
+    if (archivedFiles.size() != fileSizes.size()) {
+      throw new RuntimeException("Coding error: should never see lists of 
varying size");
+    }
+    RegionServerServices rss = this.region.getRegionServerServices();
+    if (rss == null) {
+      return;
+    }
+    List<Entry<String,Long>> filesWithSizes = new 
ArrayList<>(archivedFiles.size());
+    Iterator<Long> fileSizeIter = fileSizes.iterator();
+    for (StoreFile storeFile : archivedFiles) {
+      final long fileSize = fileSizeIter.next();
+      if (storeFile.isHFile() && fileSize != 0) {
+        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), 
fileSize));
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Files archived: " + archivedFiles + ", reporting the 
following to the Master: "
+          + filesWithSizes);
+    }
+    boolean success = rss.reportFileArchivalForQuotas(getTableName(), 
filesWithSizes);
+    if (!success) {
+      LOG.warn("Failed to report archival of files: " + filesWithSizes);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index da1f14f..dde1799 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -243,4 +246,15 @@ public interface RegionServerServices extends Server, 
MutableOnlineRegions, Favo
    * @return {@code false} if reporting should be temporarily paused, {@code 
true} otherwise.
    */
   boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore);
+
+  /**
+   * Reports a collection of files, and their sizes, that belonged to the 
given {@code table} were
+   * just moved to the archive directory.
+   *
+   * @param tableName The name of the table that files previously belonged to
+   * @param archivedFiles Files and their sizes that were moved to archive
+   * @return {@code true} if the files were successfully reported, {@code 
false} otherwise.
+   */
+  boolean reportFileArchivalForQuotas(
+      TableName tableName, Collection<Entry<String,Long>> archivedFiles);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 910d334..0ef063c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -340,4 +342,10 @@ public class MockRegionServerServices implements 
RegionServerServices {
   public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
     return true;
   }
+
+  @Override
+  public boolean reportFileArchivalForQuotas(
+      TableName tableName, Collection<Entry<String,Long>> archivedFiles) {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4c0120/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index c87d723..4641afb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hbase.master;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -684,4 +686,10 @@ ClientProtos.ClientService.BlockingInterface, 
RegionServerServices {
   public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
     return true;
   }
+
+  @Override
+  public boolean reportFileArchivalForQuotas(
+      TableName tableName, Collection<Entry<String, Long>> archivedFiles) {
+    return false;
+  }
 }

Reply via email to