HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is 
appended in the WAL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25419d8b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25419d8b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25419d8b

Branch: refs/heads/hbase-12439
Commit: 25419d8b18dd8f35a102614cd31b274659f747ef
Parents: 5d79790
Author: Ashish Singhi <ashishsin...@apache.org>
Authored: Fri Apr 1 15:40:36 2016 +0530
Committer: Ashish Singhi <ashishsin...@apache.org>
Committed: Fri Apr 1 15:40:36 2016 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AbstractFSWAL.java   |  4 +-
 .../hbase/regionserver/wal/MetricsWAL.java      |  7 ++-
 .../regionserver/wal/WALActionsListener.java    | 10 +++-
 .../replication/regionserver/Replication.java   | 50 ++++++++++++--------
 .../hadoop/hbase/wal/DisabledWALProvider.java   |  7 +--
 .../hbase/regionserver/wal/TestMetricsWAL.java  | 10 ++--
 .../hbase/wal/WALPerformanceEvaluation.java     |  3 +-
 7 files changed, 58 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index f189ff1..b89488a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -840,14 +840,14 @@ public abstract class AbstractFSWAL<W> implements WAL {
     return true;
   }
 
-  private long postAppend(final Entry e, final long elapsedTime) {
+  private long postAppend(final Entry e, final long elapsedTime) throws 
IOException {
     long len = 0;
     if (!listeners.isEmpty()) {
       for (Cell cell : e.getEdit().getCells()) {
         len += CellUtil.estimatedSerializedSizeOf(cell);
       }
       for (WALActionsListener listener : listeners) {
-        listener.postAppend(len, elapsedTime);
+        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
       }
     }
     return len;

http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
index 99792e5..69a31cd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
@@ -20,9 +20,13 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.util.StringUtils;
 
@@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base {
   }
 
   @Override
-  public void postAppend(final long size, final long time) {
+  public void postAppend(final long size, final long time, final WALKey logkey,
+      final WALEdit logEdit) throws IOException {
     source.incrementAppendCount();
     source.incrementAppendTime(time);
     source.incrementAppendSize(size);

http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index a6452e2..adcc6eb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -98,8 +98,12 @@ public interface WALActionsListener {
    * TODO: Combine this with above.
    * @param entryLen approx length of cells in this append.
    * @param elapsedTimeMillis elapsed time in milliseconds.
+   * @param logKey A WAL key
+   * @param logEdit A WAL edit containing list of cells.
+   * @throws IOException if any network or I/O error occurred
    */
-  void postAppend(final long entryLen, final long elapsedTimeMillis);
+  void postAppend(final long entryLen, final long elapsedTimeMillis, final 
WALKey logKey,
+      final WALEdit logEdit) throws IOException;
 
   /**
    * For notification post writer sync.  Used by metrics system at least.
@@ -136,7 +140,9 @@ public interface WALActionsListener {
     }
 
     @Override
-    public void postAppend(final long entryLen, final long elapsedTimeMillis) 
{}
+    public void postAppend(final long entryLen, final long elapsedTimeMillis, 
final WALKey logKey,
+        final WALEdit logEdit) throws IOException {
+    }
 
     @Override
     public void postSync(final long timeInNanos, final int handlerSyncs) {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index bb4a5a3..fa5e222 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -24,6 +24,7 @@ import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableMap;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -256,6 +257,34 @@ public class Replication extends WALActionsListener.Base 
implements
     scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
   }
 
+  @Override
+  public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey 
logKey,
+      final WALEdit edit) throws IOException {
+    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
+    if (this.replicationForBulkLoadData && scopes != null && 
!scopes.isEmpty()) {
+      TableName tableName = logKey.getTablename();
+      for (Cell c : edit.getCells()) {
+        // Only check for bulk load events
+        if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
+          BulkLoadDescriptor bld = null;
+          try {
+            bld = WALEdit.getBulkLoadDescriptor(c);
+          } catch (IOException e) {
+            LOG.error("Failed to get bulk load events information from the wal 
file.", e);
+            throw e;
+          }
+
+          for (StoreDescriptor s : bld.getStoresList()) {
+            byte[] fam = s.getFamilyName().toByteArray();
+            if (scopes.containsKey(fam)) {
+              addHFileRefsToQueue(this.getReplicationManager(), tableName, 
fam, s);
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Utility method used to set the correct scopes on each log key. Doesn't 
set a scope on keys from
    * compaction WAL edits and if the scope is local.
@@ -268,26 +297,9 @@ public class Replication extends WALActionsListener.Base 
implements
       WALEdit logEdit, Configuration conf, ReplicationSourceManager 
replicationManager)
           throws IOException {
     boolean replicationForBulkLoadEnabled = 
isReplicationForBulkLoadDataEnabled(conf);
-    byte[] family;
     boolean foundOtherEdits = false;
     for (Cell cell : logEdit.getCells()) {
-      if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
-        if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, 
WALEdit.BULK_LOAD)) {
-          try {
-            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-            for (StoreDescriptor s : bld.getStoresList()) {
-              family = s.getFamilyName().toByteArray();
-              addHFileRefsToQueue(replicationManager, logKey.getTablename(), 
family, s);
-            }
-          } catch (IOException e) {
-            LOG.error("Failed to get bulk load events information from the wal 
file.", e);
-            throw e;
-          }
-        } else {
-          // Skip the flush/compaction/region events
-          continue;
-        }
-      } else {
+      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
         foundOtherEdits = true;
       }
     }
@@ -301,7 +313,7 @@ public class Replication extends WALActionsListener.Base 
implements
     try {
       replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
     } catch (ReplicationException e) {
-      LOG.error("Failed to create hfile references in ZK.", e);
+      LOG.error("Failed to add hfile references in the replication queue.", e);
       throw new IOException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 028c60b..10fe04c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -153,16 +153,17 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean 
inMemstore) {
+    public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean 
inMemstore)
+        throws IOException {
       if (!this.listeners.isEmpty()) {
         final long start = System.nanoTime();
         long len = 0;
         for (Cell cell : edits.getCells()) {
           len += CellUtil.estimatedSerializedSizeOf(cell);
         }
-        final long elapsed = (System.nanoTime() - start)/1000000l;
+        final long elapsed = (System.nanoTime() - start) / 1000000L;
         for (WALActionsListener listener : this.listeners) {
-          listener.postAppend(len, elapsed);
+          listener.postAppend(len, elapsed, key, edits);
         }
       }
       return -1;

http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
index 2e2aa08..feb6010 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
@@ -60,10 +60,10 @@ public class TestMetricsWAL {
     MetricsWALSource source = new MetricsWALSourceImpl();
     MetricsWAL metricsWAL = new MetricsWAL(source);
     // One not so slow append (< 1000)
-    metricsWAL.postAppend(1, 900);
+    metricsWAL.postAppend(1, 900, null, null);
     // Two slow appends (> 1000)
-    metricsWAL.postAppend(1, 1010);
-    metricsWAL.postAppend(1, 2000);
+    metricsWAL.postAppend(1, 1010, null, null);
+    metricsWAL.postAppend(1, 2000, null, null);
     assertEquals(2, source.getSlowAppendCount());
   }
 
@@ -71,8 +71,8 @@ public class TestMetricsWAL {
   public void testWalWrittenInBytes() throws Exception {
     MetricsWALSource source = mock(MetricsWALSourceImpl.class);
     MetricsWAL metricsWAL = new MetricsWAL(source);
-    metricsWAL.postAppend(100, 900);
-    metricsWAL.postAppend(200, 2000);
+    metricsWAL.postAppend(100, 900, null, null);
+    metricsWAL.postAppend(200, 2000, null, null);
     verify(source, times(1)).incrementWrittenBytes(100);
     verify(source, times(1)).incrementWrittenBytes(200);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index 4a15d3c..7ce03b6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -525,7 +525,8 @@ public final class WALPerformanceEvaluation extends 
Configured implements Tool {
         }
 
         @Override
-        public void postAppend(final long size, final long elapsedTime) {
+        public void postAppend(final long size, final long elapsedTime, final 
WALKey logkey,
+            final WALEdit logEdit) {
           appendMeter.mark(size);
         }
       });

Reply via email to