HBASE-16447 Replication by namespaces config in peer (Guanghao Zhang)

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1a1003a4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1a1003a4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1a1003a4

Branch: refs/heads/hbase-12439
Commit: 1a1003a482d9bfb725fbe1097c794fdb043dcd81
Parents: 2cf8907
Author: Enis Soztutar <e...@apache.org>
Authored: Fri Sep 16 11:47:42 2016 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Fri Sep 16 11:47:42 2016 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |  38 ++-
 .../replication/ReplicationSerDeHelper.java     |  30 ++-
 .../hbase/replication/ReplicationPeer.java      |   7 +
 .../replication/ReplicationPeerConfig.java      |  15 +-
 .../replication/ReplicationPeerZKImpl.java      |  10 +
 .../replication/ReplicationPeersZKImpl.java     |   4 +-
 .../hbase/zookeeper/ZooKeeperWatcher.java       |   2 +-
 .../ipc/protobuf/generated/TestProtos.java      |  10 +-
 .../protobuf/generated/ZooKeeperProtos.java     | 186 ++++++++++++--
 .../src/main/protobuf/ZooKeeper.proto           |   1 +
 .../apache/hadoop/hbase/ZKNamespaceManager.java |   2 +-
 .../replication/BaseReplicationEndpoint.java    |   6 +-
 .../NamespaceTableCfWALEntryFilter.java         | 126 ++++++++++
 .../replication/TableCfWALEntryFilter.java      | 101 --------
 .../replication/TestReplicationAdmin.java       |  84 +++++++
 .../replication/TestNamespaceReplication.java   | 248 +++++++++++++++++++
 .../TestReplicationWALEntryFilters.java         |  73 +++++-
 ...egionReplicaReplicationEndpointNoMaster.java |   3 +
 .../src/main/ruby/hbase/replication_admin.rb    |  36 +++
 hbase-shell/src/main/ruby/hbase_constants.rb    |   1 +
 hbase-shell/src/main/ruby/shell.rb              |   1 +
 .../src/main/ruby/shell/commands/add_peer.rb    |  16 +-
 .../src/main/ruby/shell/commands/list_peers.rb  |   7 +-
 .../ruby/shell/commands/set_peer_namespaces.rb  |  51 ++++
 .../ruby/shell/commands/set_peer_tableCFs.rb    |  10 +-
 .../test/ruby/hbase/replication_admin_test.rb   |  69 +++++-
 26 files changed, 993 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index de6cb7f..dc1a7ad 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -189,6 +189,8 @@ public class ReplicationAdmin implements Closeable {
    * @param peerConfig configuration for the replication slave cluster
    */
   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws 
ReplicationException {
+    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+      peerConfig.getTableCFsMap());
     this.replicationPeers.registerPeer(id, peerConfig);
   }
 
@@ -202,8 +204,11 @@ public class ReplicationAdmin implements Closeable {
 
   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
       throws ReplicationException {
+    checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+      peerConfig.getTableCFsMap());
     this.replicationPeers.updatePeerConfig(id, peerConfig);
   }
+
   /**
    * Removes a peer cluster and stops the replication to it.
    * @param id a short name that identifies the cluster
@@ -360,7 +365,6 @@ public class ReplicationAdmin implements Closeable {
         }
       } else {
         throw new ReplicationException("No table: " + table + " in table-cfs 
config of peer: " + id);
-
       }
     }
     setPeerTableCFs(id, preTableCfs);
@@ -376,6 +380,8 @@ public class ReplicationAdmin implements Closeable {
    */
   public void setPeerTableCFs(String id, Map<TableName, ? extends 
Collection<String>> tableCfs)
       throws ReplicationException {
+    checkNamespacesAndTableCfsConfigConflict(
+      this.replicationPeers.getReplicationPeerConfig(id).getNamespaces(), 
tableCfs);
     this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
   }
 
@@ -627,4 +633,34 @@ public class ReplicationAdmin implements Closeable {
     }
     return true;
   }
+
+  /**
+   * Set a namespace in the peer config means that all tables in this namespace
+   * will be replicated to the peer cluster.
+   *
+   * 1. If you already have set a namespace in the peer config, then you can't 
set any table
+   *    of this namespace to the peer config.
+   * 2. If you already have set a table in the peer config, then you can't set 
this table's
+   *    namespace to the peer config.
+   *
+   * @param namespaces
+   * @param tableCfs
+   * @throws ReplicationException
+   */
+  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws 
ReplicationException {
+    if (namespaces == null || namespaces.isEmpty()) {
+      return;
+    }
+    if (tableCfs == null || tableCfs.isEmpty()) {
+      return;
+    }
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+      TableName table = entry.getKey();
+      if (namespaces.contains(table.getNamespaceAsString())) {
+        throw new ReplicationException(
+            "Table-cfs config conflict with namespaces config in peer");
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
index 9682f89..225e685 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hbase.client.replication;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
@@ -34,10 +36,12 @@ import org.apache.hadoop.hbase.util.Strings;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.ArrayList;
+import java.util.Set;
 
 /**
  * Helper for TableCFs Operations.
@@ -50,6 +54,13 @@ public final class ReplicationSerDeHelper {
 
   private ReplicationSerDeHelper() {}
 
+  public static String convertToString(Set<String> namespaces) {
+    if (namespaces == null) {
+      return null;
+    }
+    return StringUtils.join(namespaces, ';');
+  }
+
   /** convert map to TableCFs Object */
   public static ZooKeeperProtos.TableCF[] convert(
       Map<TableName, ? extends Collection<String>> tableCfs) {
@@ -262,11 +273,21 @@ public final class ReplicationSerDeHelper {
     for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
       peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
     }
+
     Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
       peer.getTableCfsList().toArray(new 
ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
     if (tableCFsMap != null) {
       peerConfig.setTableCFsMap(tableCFsMap);
     }
+
+    List<ByteString> namespacesList = peer.getNamespacesList();
+    if (namespacesList != null && namespacesList.size() != 0) {
+      Set<String> namespaces = new HashSet<String>();
+      for (ByteString namespace : namespacesList) {
+        namespaces.add(namespace.toStringUtf8());
+      }
+      peerConfig.setNamespaces(namespaces);
+    }
     return peerConfig;
   }
 
@@ -292,12 +313,20 @@ public final class ReplicationSerDeHelper {
           .setValue(entry.getValue())
           .build());
     }
+
     ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
     if (tableCFs != null) {
       for (int i = 0; i < tableCFs.length; i++) {
         builder.addTableCfs(tableCFs[i]);
       }
     }
+    Set<String> namespaces = peerConfig.getNamespaces();
+    if (namespaces != null) {
+      for (String namespace : namespaces) {
+        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
+      }
+    }
+
     return builder.build();
   }
 
@@ -311,5 +340,4 @@ public final class ReplicationSerDeHelper {
     byte[] bytes = convert(peerConfig).toByteArray();
     return ProtobufUtil.prependPBMagic(bytes);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 3da01fe..bd2b700 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -71,6 +72,12 @@ public interface ReplicationPeer {
    */
   public Map<TableName, List<String>> getTableCFs();
 
+  /**
+   * Get replicable namespace set of this peer
+   * @return the replicable namespaces set
+   */
+  public Set<String> getNamespaces();
+
   void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 1d2066c..1f0d085 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.TableName;
@@ -42,7 +43,7 @@ public class ReplicationPeerConfig {
   private final Map<byte[], byte[]> peerData;
   private final Map<String, String> configuration;
   private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
-
+  private Set<String> namespaces = null;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -93,10 +94,22 @@ public class ReplicationPeerConfig {
     return this;
   }
 
+  public Set<String> getNamespaces() {
+    return this.namespaces;
+  }
+
+  public ReplicationPeerConfig setNamespaces(Set<String> namespaces) {
+    this.namespaces = namespaces;
+    return this;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new 
StringBuilder("clusterKey=").append(clusterKey).append(",");
     
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
+    if (namespaces != null) {
+      builder.append("namespaces=").append(namespaces.toString()).append(",");
+    }
     if (tableCFsMap != null) {
       builder.append("tableCFs=").append(tableCFsMap.toString());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index a33690c..cfe543a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -163,6 +164,15 @@ public class ReplicationPeerZKImpl extends 
ReplicationStateZKBase
     return this.tableCFs;
   }
 
+  /**
+   * Get replicable namespace set of this peer
+   * @return the replicable namespaces set
+   */
+  @Override
+  public Set<String> getNamespaces() {
+    return this.peerConfig.getNamespaces();
+  }
+
   @Override
   public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
     if (this.peerConfigTracker != null){

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 54c2dac..90b1347 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -343,7 +343,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
       throws ReplicationException {
     ReplicationPeer peer = getConnectedPeer(id);
     if (peer == null){
-      throw new ReplicationException("Could not find peer Id " + id);
+      throw new ReplicationException("Could not find peer Id " + id + " in 
connected peers");
     }
     ReplicationPeerConfig existingConfig = peer.getPeerConfig();
     if (newConfig.getClusterKey() != null && 
!newConfig.getClusterKey().isEmpty() &&
@@ -366,6 +366,8 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
     // or data that weren't explicitly changed
     existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
     existingConfig.getPeerData().putAll(newConfig.getPeerData());
+    existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
+    existingConfig.setNamespaces(newConfig.getNamespaces());
 
     try {
       ZKUtil.setData(this.zookeeper, getPeerNode(id),

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 5ef7171..f7d7e26 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -122,7 +122,7 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
   // znode containing the state of recovering regions
   public String recoveringRegionsZNode;
   // znode containing namespace descriptors
-  public static String namespaceZNode = "namespace";
+  public String namespaceZNode = "namespace";
   // znode of indicating master maintenance mode
   public static String masterMaintZNode = "masterMaintenance";
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
index d28945c..58e248e 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
@@ -2091,7 +2091,7 @@ public final class TestProtos {
 
       public final boolean isInitialized() {
         if (!hasMs()) {
-
+          
           return false;
         }
         return true;
@@ -2291,7 +2291,7 @@ public final class TestProtos {
       if (ref instanceof java.lang.String) {
         return (java.lang.String) ref;
       } else {
-        com.google.protobuf.ByteString bs =
+        com.google.protobuf.ByteString bs = 
             (com.google.protobuf.ByteString) ref;
         java.lang.String s = bs.toStringUtf8();
         if (bs.isValidUtf8()) {
@@ -2307,7 +2307,7 @@ public final class TestProtos {
         getAddrBytes() {
       java.lang.Object ref = addr_;
       if (ref instanceof java.lang.String) {
-        com.google.protobuf.ByteString b =
+        com.google.protobuf.ByteString b = 
             com.google.protobuf.ByteString.copyFromUtf8(
                 (java.lang.String) ref);
         addr_ = b;
@@ -2567,7 +2567,7 @@ public final class TestProtos {
 
       public final boolean isInitialized() {
         if (!hasAddr()) {
-
+          
           return false;
         }
         return true;
@@ -2621,7 +2621,7 @@ public final class TestProtos {
           getAddrBytes() {
         java.lang.Object ref = addr_;
         if (ref instanceof String) {
-          com.google.protobuf.ByteString b =
+          com.google.protobuf.ByteString b = 
               com.google.protobuf.ByteString.copyFromUtf8(
                   (java.lang.String) ref);
           addr_ = b;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index f64d0c1..d7de638 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4782,6 +4782,20 @@ public final class ZooKeeperProtos {
      */
     
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder 
getTableCfsOrBuilder(
         int index);
+
+    // repeated bytes namespaces = 6;
+    /**
+     * <code>repeated bytes namespaces = 6;</code>
+     */
+    java.util.List<com.google.protobuf.ByteString> getNamespacesList();
+    /**
+     * <code>repeated bytes namespaces = 6;</code>
+     */
+    int getNamespacesCount();
+    /**
+     * <code>repeated bytes namespaces = 6;</code>
+     */
+    com.google.protobuf.ByteString getNamespaces(int index);
   }
   /**
    * Protobuf type {@code hbase.pb.ReplicationPeer}
@@ -4873,6 +4887,14 @@ public final class ZooKeeperProtos {
               
tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER,
 extensionRegistry));
               break;
             }
+            case 50: {
+              if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
+                namespaces_ = new 
java.util.ArrayList<com.google.protobuf.ByteString>();
+                mutable_bitField0_ |= 0x00000020;
+              }
+              namespaces_.add(input.readBytes());
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4890,6 +4912,9 @@ public final class ZooKeeperProtos {
         if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
           tableCfs_ = java.util.Collections.unmodifiableList(tableCfs_);
         }
+        if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
+          namespaces_ = java.util.Collections.unmodifiableList(namespaces_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -5131,12 +5156,36 @@ public final class ZooKeeperProtos {
       return tableCfs_.get(index);
     }
 
+    // repeated bytes namespaces = 6;
+    public static final int NAMESPACES_FIELD_NUMBER = 6;
+    private java.util.List<com.google.protobuf.ByteString> namespaces_;
+    /**
+     * <code>repeated bytes namespaces = 6;</code>
+     */
+    public java.util.List<com.google.protobuf.ByteString>
+        getNamespacesList() {
+      return namespaces_;
+    }
+    /**
+     * <code>repeated bytes namespaces = 6;</code>
+     */
+    public int getNamespacesCount() {
+      return namespaces_.size();
+    }
+    /**
+     * <code>repeated bytes namespaces = 6;</code>
+     */
+    public com.google.protobuf.ByteString getNamespaces(int index) {
+      return namespaces_.get(index);
+    }
+
     private void initFields() {
       clusterkey_ = "";
       replicationEndpointImpl_ = "";
       data_ = java.util.Collections.emptyList();
       configuration_ = java.util.Collections.emptyList();
       tableCfs_ = java.util.Collections.emptyList();
+      namespaces_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5187,6 +5236,9 @@ public final class ZooKeeperProtos {
       for (int i = 0; i < tableCfs_.size(); i++) {
         output.writeMessage(5, tableCfs_.get(i));
       }
+      for (int i = 0; i < namespaces_.size(); i++) {
+        output.writeBytes(6, namespaces_.get(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5216,6 +5268,15 @@ public final class ZooKeeperProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, tableCfs_.get(i));
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < namespaces_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(namespaces_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getNamespacesList().size();
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5255,6 +5316,8 @@ public final class ZooKeeperProtos {
           .equals(other.getConfigurationList());
       result = result && getTableCfsList()
           .equals(other.getTableCfsList());
+      result = result && getNamespacesList()
+          .equals(other.getNamespacesList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5288,6 +5351,10 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
         hash = (53 * hash) + getTableCfsList().hashCode();
       }
+      if (getNamespacesCount() > 0) {
+        hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
+        hash = (53 * hash) + getNamespacesList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5427,6 +5494,8 @@ public final class ZooKeeperProtos {
         } else {
           tableCfsBuilder_.clear();
         }
+        namespaces_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -5490,6 +5559,11 @@ public final class ZooKeeperProtos {
         } else {
           result.tableCfs_ = tableCfsBuilder_.build();
         }
+        if (((bitField0_ & 0x00000020) == 0x00000020)) {
+          namespaces_ = java.util.Collections.unmodifiableList(namespaces_);
+          bitField0_ = (bitField0_ & ~0x00000020);
+        }
+        result.namespaces_ = namespaces_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5594,6 +5668,16 @@ public final class ZooKeeperProtos {
             }
           }
         }
+        if (!other.namespaces_.isEmpty()) {
+          if (namespaces_.isEmpty()) {
+            namespaces_ = other.namespaces_;
+            bitField0_ = (bitField0_ & ~0x00000020);
+          } else {
+            ensureNamespacesIsMutable();
+            namespaces_.addAll(other.namespaces_);
+          }
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6541,6 +6625,78 @@ public final class ZooKeeperProtos {
         return tableCfsBuilder_;
       }
 
+      // repeated bytes namespaces = 6;
+      private java.util.List<com.google.protobuf.ByteString> namespaces_ = 
java.util.Collections.emptyList();
+      private void ensureNamespacesIsMutable() {
+        if (!((bitField0_ & 0x00000020) == 0x00000020)) {
+          namespaces_ = new 
java.util.ArrayList<com.google.protobuf.ByteString>(namespaces_);
+          bitField0_ |= 0x00000020;
+         }
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public java.util.List<com.google.protobuf.ByteString>
+          getNamespacesList() {
+        return java.util.Collections.unmodifiableList(namespaces_);
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public int getNamespacesCount() {
+        return namespaces_.size();
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public com.google.protobuf.ByteString getNamespaces(int index) {
+        return namespaces_.get(index);
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public Builder setNamespaces(
+          int index, com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureNamespacesIsMutable();
+        namespaces_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public Builder addNamespaces(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureNamespacesIsMutable();
+        namespaces_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public Builder addAllNamespaces(
+          java.lang.Iterable<? extends com.google.protobuf.ByteString> values) 
{
+        ensureNamespacesIsMutable();
+        super.addAll(values, namespaces_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes namespaces = 6;</code>
+       */
+      public Builder clearNamespaces() {
+        namespaces_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000020);
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
     }
 
@@ -9822,24 +9978,24 @@ public final class ZooKeeperProtos {
       
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
       
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T"
 +
       "ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
-      "bleName\022\020\n\010families\030\002 
\003(\014\"\305\001\n\017Replicatio" +
+      "bleName\022\020\n\010families\030\002 
\003(\014\"\331\001\n\017Replicatio" +
       "nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
       "EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 
\003(\0132\030.hbase" +
       ".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
       "\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
-      "\030\005 \003(\0132\021.hbase.pb.TableCF\"g\n\020Replication" +
-      "State\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicat" +
-      
"ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
-      "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
-      "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo",
-      "ck_owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_na" +
-      "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
-      "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
-      "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 
\001(\010\022\017\n\007purpose" +
-      "\030\005 \001(\t\022\023\n\013create_time\030\006 
\001(\003\"\036\n\013SwitchSta" +
-      "te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" +
-      ".hbase.protobuf.generatedB\017ZooKeeperProt" +
-      "osH\001\210\001\001\240\001\001"
+      "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
+      "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 
\002(\016" +
+      "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
+      
"ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" 
+
+      "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017R",
+      "eplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tT" +
+      "ableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb." +
+      "TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb" +
+      ".ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sha" +
+      "red\030\004 \001(\010\022\017\n\007purpose\030\005 
\001(\t\022\023\n\013create_tim" +
+      "e\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 
\001(\010B" +
+      "E\n*org.apache.hadoop.hbase.protobuf.gene" +
+      "ratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9887,7 +10043,7 @@ public final class ZooKeeperProtos {
           internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationPeer_descriptor,
-              new java.lang.String[] { "Clusterkey", 
"ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", });
+              new java.lang.String[] { "Clusterkey", 
"ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", 
});
           internal_static_hbase_pb_ReplicationState_descriptor =
             getDescriptor().getMessageTypes().get(7);
           internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto 
b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 8713cbd..ea8f747 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -121,6 +121,7 @@ message ReplicationPeer {
   repeated BytesBytesPair data = 3;
   repeated NameStringPair configuration = 4;
   repeated TableCF table_cfs = 5;
+  repeated bytes namespaces = 6;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
index ee59c01..7b53333 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
@@ -54,7 +54,7 @@ public class ZKNamespaceManager extends ZooKeeperListener {
 
   public ZKNamespaceManager(ZooKeeperWatcher zkw) throws IOException {
     super(zkw);
-    nsZNode = ZooKeeperWatcher.namespaceZNode;
+    nsZNode = zkw.namespaceZNode;
     cache = new ConcurrentSkipListMap<String, NamespaceDescriptor>();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index d667269..48f3ac5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -72,7 +72,7 @@ public abstract class BaseReplicationEndpoint extends 
AbstractService
     if (scopeFilter != null) {
       filters.add(scopeFilter);
     }
-    WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
+    WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter();
     if (tableCfFilter != null) {
       filters.add(tableCfFilter);
     }
@@ -87,8 +87,8 @@ public abstract class BaseReplicationEndpoint extends 
AbstractService
 
   /** Returns a WALEntryFilter for checking replication per table and CF. 
Subclasses can
    * return null if they don't want this filter */
-  protected WALEntryFilter getTableCfWALEntryFilter() {
-    return new TableCfWALEntryFilter(ctx.getReplicationPeer());
+  protected WALEntryFilter getNamespaceTableCfWALEntryFilter() {
+    return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
new file mode 100644
index 0000000..2673cbd
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import com.google.common.base.Predicate;
+
+/**
+ * Filter a WAL Entry by namespaces and table-cfs config in the peer. It first 
filter entry
+ * by namespaces config, then filter entry by table-cfs config.
+ *
+ * 1. Set a namespace in peer config means that all tables in this namespace 
will be replicated.
+ * 2. If the namespaces config is null, then the table-cfs config decide which 
table's edit
+ *    can be replicated. If the table-cfs config is null, then the namespaces 
config decide
+ *    which table's edit can be replicated.
+ */
+@InterfaceAudience.Private
+public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, 
WALCellFilter {
+
+  private static final Log LOG = 
LogFactory.getLog(NamespaceTableCfWALEntryFilter.class);
+  private final ReplicationPeer peer;
+  private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
+
+  public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) {
+    this.peer = peer;
+  }
+
+  @Override
+  public Entry filter(Entry entry) {
+    TableName tabName = entry.getKey().getTablename();
+    String namespace = tabName.getNamespaceAsString();
+    Set<String> namespaces = this.peer.getNamespaces();
+    Map<TableName, List<String>> tableCFs = getTableCfs();
+
+    // If null means user has explicitly not configured any namespaces and 
table CFs
+    // so all the tables data are applicable for replication
+    if (namespaces == null && tableCFs == null) {
+      return entry;
+    }
+
+    // First filter by namespaces config
+    // If table's namespace in peer config, all the tables data are applicable 
for replication
+    if (namespaces != null && namespaces.contains(namespace)) {
+      return entry;
+    }
+
+    // Then filter by table-cfs config
+    // return null(prevent replicating) if logKey's table isn't in this peer's
+    // replicaable namespace list and table list
+    if (tableCFs == null || !tableCFs.containsKey(tabName)) {
+      return null;
+    }
+
+    return entry;
+  }
+
+  @Override
+  public Cell filterCell(final Entry entry, Cell cell) {
+    final Map<TableName, List<String>> tableCfs = getTableCfs();
+    if (tableCfs == null) return cell;
+    TableName tabName = entry.getKey().getTablename();
+    List<String> cfs = tableCfs.get(tabName);
+    // ignore(remove) kv if its cf isn't in the replicable cf list
+    // (empty cfs means all cfs of this table are replicable)
+    if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+      cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+        @Override
+        public boolean apply(byte[] fam) {
+          if (tableCfs != null) {
+            List<String> cfs = tableCfs.get(entry.getKey().getTablename());
+            if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+              return true;
+            }
+          }
+          return false;
+        }
+      });
+    } else {
+      if ((cfs != null) && !cfs.contains(
+        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength()))) {
+        return null;
+      }
+    }
+    return cell;
+  }
+
+  Map<TableName, List<String>> getTableCfs() {
+    Map<TableName, List<String>> tableCFs = null;
+    try {
+      tableCFs = this.peer.getTableCFs();
+    } catch (IllegalArgumentException e) {
+      LOG.error("should not happen: can't get tableCFs for peer " + 
peer.getId() +
+          ", degenerate as if it's not configured by keeping tableCFs==null");
+    }
+    return tableCFs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
deleted file mode 100644
index d890e3e..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-
-import com.google.common.base.Predicate;
-
-public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
-
-  private static final Log LOG = 
LogFactory.getLog(TableCfWALEntryFilter.class);
-  private ReplicationPeer peer;
-  private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
-
-  public TableCfWALEntryFilter(ReplicationPeer peer) {
-    this.peer = peer;
-  }
-
-  @Override
-  public Entry filter(Entry entry) {
-    TableName tabName = entry.getKey().getTablename();
-    Map<TableName, List<String>> tableCFs = getTableCfs();
-
-    // If null means user has explicitly not configured any table CFs so all 
the tables data are
-    // applicable for replication
-    if (tableCFs == null) return entry;
-
-    if (!tableCFs.containsKey(tabName)) {
-      return null;
-    }
-
-    return entry;
-  }
-
-  @Override
-  public Cell filterCell(final Entry entry, Cell cell) {
-    final Map<TableName, List<String>> tableCfs = getTableCfs();
-    if (tableCfs == null) return cell;
-    TableName tabName = entry.getKey().getTablename();
-    List<String> cfs = tableCfs.get(tabName);
-    // ignore(remove) kv if its cf isn't in the replicable cf list
-    // (empty cfs means all cfs of this table are replicable)
-    if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-      cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
-        @Override
-        public boolean apply(byte[] fam) {
-          if (tableCfs != null) {
-            List<String> cfs = tableCfs.get(entry.getKey().getTablename());
-            if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
-              return true;
-            }
-          }
-          return false;
-        }
-      });
-    } else {
-      if ((cfs != null) && !cfs.contains(
-        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength()))) {
-        return null;
-      }
-    }
-    return cell;
-  }
-
-  Map<TableName, List<String>> getTableCfs() {
-    Map<TableName, List<String>> tableCFs = null;
-    try {
-      tableCFs = this.peer.getTableCFs();
-    } catch (IllegalArgumentException e) {
-      LOG.error("should not happen: can't get tableCFs for peer " + 
peer.getId() +
-          ", degenerate as if it's not configured by keeping tableCFs==null");
-    }
-    return tableCFs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 85820af..c0d18dd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.client.replication;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -385,4 +387,86 @@ public class TestReplicationAdmin {
 
     admin.removePeer(ID_ONE);
   }
+
+  @Test
+  public void testSetPeerNamespaces() throws Exception {
+    String ns1 = "ns1";
+    String ns2 = "ns2";
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    admin.addPeer(ID_ONE, rpc);
+    admin.peerAdded(ID_ONE);
+
+    rpc = admin.getPeerConfig(ID_ONE);
+    Set<String> namespaces = new HashSet<String>();
+    namespaces.add(ns1);
+    namespaces.add(ns2);
+    rpc.setNamespaces(namespaces);
+    admin.updatePeerConfig(ID_ONE, rpc);
+    namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+    assertEquals(2, namespaces.size());
+    assertTrue(namespaces.contains(ns1));
+    assertTrue(namespaces.contains(ns2));
+
+    rpc = admin.getPeerConfig(ID_ONE);
+    namespaces.clear();
+    namespaces.add(ns1);
+    rpc.setNamespaces(namespaces);
+    admin.updatePeerConfig(ID_ONE, rpc);
+    namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+    assertEquals(1, namespaces.size());
+    assertTrue(namespaces.contains(ns1));
+
+    admin.removePeer(ID_ONE);
+  }
+
+  @Test
+  public void testNamespacesAndTableCfsConfigConflict() throws 
ReplicationException {
+    String ns1 = "ns1";
+    String ns2 = "ns2";
+    TableName tab1 = TableName.valueOf("ns1:tabl");
+    TableName tab2 = TableName.valueOf("ns2:tab2");
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    admin.addPeer(ID_ONE, rpc);
+    admin.peerAdded(ID_ONE);
+
+    rpc = admin.getPeerConfig(ID_ONE);
+    Set<String> namespaces = new HashSet<String>();
+    namespaces.add(ns1);
+    rpc.setNamespaces(namespaces);
+    admin.updatePeerConfig(ID_ONE, rpc);
+    rpc = admin.getPeerConfig(ID_ONE);
+    Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tab1, new ArrayList<String>());
+    rpc.setTableCFsMap(tableCfs);
+    try {
+      admin.updatePeerConfig(ID_ONE, rpc);
+      fail("Should throw ReplicationException, because table " + tab1 + " 
conflict with namespace "
+          + ns1);
+    } catch (ReplicationException e) {
+      // OK
+    }
+
+    rpc = admin.getPeerConfig(ID_ONE);
+    tableCfs.clear();
+    tableCfs.put(tab2, new ArrayList<String>());
+    rpc.setTableCFsMap(tableCfs);
+    admin.updatePeerConfig(ID_ONE, rpc);
+    rpc = admin.getPeerConfig(ID_ONE);
+    namespaces.clear();
+    namespaces.add(ns2);
+    rpc.setNamespaces(namespaces);
+    try {
+      admin.updatePeerConfig(ID_ONE, rpc);
+      fail("Should throw ReplicationException, because namespace " + ns2 + " 
conflict with table "
+          + tab2);
+    } catch (ReplicationException e) {
+      // OK
+    }
+
+    admin.removePeer(ID_ONE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
new file mode 100644
index 0000000..ee9b0cb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -0,0 +1,248 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class})
+public class TestNamespaceReplication extends TestReplicationBase {
+
+  private static final Log LOG = 
LogFactory.getLog(TestNamespaceReplication.class);
+
+  private static String ns1 = "ns1";
+  private static String ns2 = "ns2";
+
+  private static final TableName tabAName = TableName.valueOf("ns1:TA");
+  private static final TableName tabBName = TableName.valueOf("ns2:TB");
+
+  private static final byte[] f1Name = Bytes.toBytes("f1");
+  private static final byte[] f2Name = Bytes.toBytes("f2");
+
+  private static final byte[] val = Bytes.toBytes("myval");
+
+  private static HTableDescriptor tabA;
+  private static HTableDescriptor tabB;
+
+  private static Connection connection1;
+  private static Connection connection2;
+  private static Admin admin1;
+  private static Admin admin2;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestReplicationBase.setUpBeforeClass();
+
+    connection1 = ConnectionFactory.createConnection(conf1);
+    connection2 = ConnectionFactory.createConnection(conf2);
+    admin1 = connection1.getAdmin();
+    admin2 = connection2.getAdmin();
+
+    admin1.createNamespace(NamespaceDescriptor.create(ns1).build());
+    admin1.createNamespace(NamespaceDescriptor.create(ns2).build());
+    admin2.createNamespace(NamespaceDescriptor.create(ns1).build());
+    admin2.createNamespace(NamespaceDescriptor.create(ns2).build());
+
+    tabA = new HTableDescriptor(tabAName);
+    HColumnDescriptor fam = new HColumnDescriptor(f1Name);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    tabA.addFamily(fam);
+    fam = new HColumnDescriptor(f2Name);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    tabA.addFamily(fam);
+    admin1.createTable(tabA);
+    admin2.createTable(tabA);
+
+    tabB = new HTableDescriptor(tabBName);
+    fam = new HColumnDescriptor(f1Name);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    tabB.addFamily(fam);
+    fam = new HColumnDescriptor(f2Name);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    tabB.addFamily(fam);
+    admin1.createTable(tabB);
+    admin2.createTable(tabB);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    admin1.disableTable(tabAName);
+    admin1.deleteTable(tabAName);
+    admin1.disableTable(tabBName);
+    admin1.deleteTable(tabBName);
+    admin2.disableTable(tabAName);
+    admin2.deleteTable(tabAName);
+    admin2.disableTable(tabBName);
+    admin2.deleteTable(tabBName);
+
+    admin1.deleteNamespace(ns1);
+    admin1.deleteNamespace(ns2);
+    admin2.deleteNamespace(ns1);
+    admin2.deleteNamespace(ns2);
+
+    connection1.close();
+    connection2.close();
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  @Test
+  public void testNamespaceReplication() throws Exception {
+    Table htab1A = connection1.getTable(tabAName);
+    Table htab2A = connection2.getTable(tabAName);
+
+    Table htab1B = connection1.getTable(tabBName);
+    Table htab2B = connection2.getTable(tabBName);
+
+    admin.peerAdded("2");
+    // add ns1 to peer config which replicate to cluster2
+    ReplicationPeerConfig rpc = admin.getPeerConfig("2");
+    Set<String> namespaces = new HashSet<>();
+    namespaces.add(ns1);
+    rpc.setNamespaces(namespaces);
+    admin.updatePeerConfig("2", rpc);
+    LOG.info("update peer config");
+
+    // Table A can be replicated to cluster2
+    put(htab1A, row, f1Name, f2Name);
+    ensureRowExisted(htab2A, row, f1Name, f2Name);
+    delete(htab1A, row, f1Name, f2Name);
+    ensureRowNotExisted(htab2A, row, f1Name, f2Name);
+
+    // Table B can not be replicated to cluster2
+    put(htab1B, row, f1Name, f2Name);
+    ensureRowNotExisted(htab2B, row, f1Name, f2Name);
+
+    // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2
+    rpc = admin.getPeerConfig("2");
+    namespaces = new HashSet<>();
+    namespaces.add(ns2);
+    rpc.setNamespaces(namespaces);
+    Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tabAName, new ArrayList<String>());
+    tableCfs.get(tabAName).add("f1");
+    rpc.setTableCFsMap(tableCfs);
+    admin.updatePeerConfig("2", rpc);
+    LOG.info("update peer config");
+
+    // Only family f1 of Table A can replicated to cluster2
+    put(htab1A, row, f1Name, f2Name);
+    ensureRowExisted(htab2A, row, f1Name);
+    delete(htab1A, row, f1Name, f2Name);
+    ensureRowNotExisted(htab2A, row, f1Name);
+
+    // All cfs of table B can replicated to cluster2
+    put(htab1B, row, f1Name, f2Name);
+    ensureRowExisted(htab2B, row, f1Name, f2Name);
+    delete(htab1B, row, f1Name, f2Name);
+    ensureRowNotExisted(htab2B, row, f1Name, f2Name);
+
+    admin.removePeer("2");
+  }
+
+  private void put(Table source, byte[] row, byte[]... families)
+      throws Exception {
+    for (byte[] fam : families) {
+      Put put = new Put(row);
+      put.addColumn(fam, row, val);
+      source.put(put);
+    }
+  }
+
+  private void delete(Table source, byte[] row, byte[]... families)
+      throws Exception {
+    for (byte[] fam : families) {
+      Delete del = new Delete(row);
+      del.addFamily(fam);
+      source.delete(del);
+    }
+  }
+
+  private void ensureRowExisted(Table target, byte[] row, byte[]... families)
+      throws Exception {
+    for (byte[] fam : families) {
+      Get get = new Get(row);
+      get.addFamily(fam);
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i == NB_RETRIES - 1) {
+          fail("Waited too much time for put replication");
+        }
+        Result res = target.get(get);
+        if (res.size() == 0) {
+          LOG.info("Row not available");
+        } else {
+          assertEquals(res.size(), 1);
+          assertArrayEquals(res.value(), val);
+          break;
+        }
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+
+  private void ensureRowNotExisted(Table target, byte[] row, byte[]... 
families)
+      throws Exception {
+    for (byte[] fam : families) {
+      Get get = new Get(row);
+      get.addFamily(fam);
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i == NB_RETRIES - 1) {
+          fail("Waited too much time for delete replication");
+        }
+        Result res = target.get(get);
+        if (res.size() >= 1) {
+          LOG.info("Row not deleted");
+        } else {
+          break;
+        }
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 04d9232..3d4062f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.Cell;
@@ -196,19 +198,22 @@ public class TestReplicationWALEntryFilters {
   }
 
   @Test
-  public void testTableCfWALEntryFilter() {
+  public void testNamespaceTableCfWALEntryFilter() {
     ReplicationPeer peer = mock(ReplicationPeer.class);
 
+    // 1. no namespaces config and table-cfs config in peer
+    when(peer.getNamespaces()).thenReturn(null);
     when(peer.getTableCFs()).thenReturn(null);
     Entry userEntry = createEntry(null, a, b, c);
-    WALEntryFilter filter = new ChainWALEntryFilter(new 
TableCfWALEntryFilter(peer));
+    WALEntryFilter filter = new ChainWALEntryFilter(new 
NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
 
+    // 2. Only config table-cfs in peer
     // empty map
     userEntry = createEntry(null, a, b, c);
     Map<TableName, List<String>> tableCfs = new HashMap<TableName, 
List<String>>();
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
     // table bar
@@ -216,7 +221,7 @@ public class TestReplicationWALEntryFilters {
     tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(TableName.valueOf("bar"), null);
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
     // table foo:a
@@ -224,7 +229,7 @@ public class TestReplicationWALEntryFilters {
     tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a), filter.filter(userEntry));
 
     // table foo:a,c
@@ -232,8 +237,64 @@ public class TestReplicationWALEntryFilters {
     tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,c), filter.filter(userEntry));
+
+    // 3. Only config namespaces in peer
+    when(peer.getTableCFs()).thenReturn(null);
+    // empty set
+    Set<String> namespaces = new HashSet<String>();
+    when(peer.getNamespaces()).thenReturn(namespaces);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(null, filter.filter(userEntry));
+
+    // namespace default
+    namespaces.add("default");
+    when(peer.getNamespaces()).thenReturn(namespaces);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
+
+    // namespace ns1
+    namespaces = new HashSet<String>();;
+    namespaces.add("ns1");
+    when(peer.getNamespaces()).thenReturn(namespaces);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(null, filter.filter(userEntry));
+
+    // 4. Config namespaces and table-cfs both
+    // Namespaces config should not confict with table-cfs config
+    namespaces = new HashSet<String>();
+    tableCfs = new HashMap<TableName, List<String>>();
+    namespaces.add("ns1");
+    when(peer.getNamespaces()).thenReturn(namespaces);
+    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, c), filter.filter(userEntry));
+
+    namespaces = new HashSet<String>();;
+    tableCfs = new HashMap<TableName, List<String>>();
+    namespaces.add("default");
+    when(peer.getNamespaces()).thenReturn(namespaces);
+    tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+    namespaces = new HashSet<String>();;
+    tableCfs = new HashMap<TableName, List<String>>();
+    namespaces.add("ns1");
+    when(peer.getNamespaces()).thenReturn(namespaces);
+    tableCfs.put(TableName.valueOf("bar"), null);
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    userEntry = createEntry(null, a, b, c);
+    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+    assertEquals(null, filter.filter(userEntry));
   }
 
   private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 6759daf..6f5ad56 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import 
org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -280,7 +281,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
 
     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
+    when(mockPeer.getNamespaces()).thenReturn(null);
     when(mockPeer.getTableCFs()).thenReturn(null);
+    when(mockPeer.getPeerConfig()).thenReturn(new ReplicationPeerConfig());
     when(context.getReplicationPeer()).thenReturn(mockPeer);
 
     replicator.init(context);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb 
b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 4de3962..f99ccae 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -62,6 +62,7 @@ module Hbase
         config = args.fetch(CONFIG, nil)
         data = args.fetch(DATA, nil)
         table_cfs = args.fetch(TABLE_CFS, nil)
+        namespaces = args.fetch(NAMESPACES, nil)
 
         # Create and populate a ReplicationPeerConfig
         replication_peer_config = ReplicationPeerConfig.new
@@ -83,6 +84,14 @@ module Hbase
           }
         end
 
+        unless namespaces.nil?
+          ns_set = java.util.HashSet.new
+          namespaces.each do |n|
+            ns_set.add(n)
+          end
+          replication_peer_config.set_namespaces(ns_set)
+        end
+
         unless table_cfs.nil?
           # convert table_cfs to TableName
           map = java.util.HashMap.new
@@ -180,12 +189,39 @@ module Hbase
       end
       @replication_admin.removePeerTableCFs(id, map)
     end
+
+    # Set new namespaces config for the specified peer
+    def set_peer_namespaces(id, namespaces)
+      unless namespaces.nil?
+        ns_set = java.util.HashSet.new
+        namespaces.each do |n|
+          ns_set.add(n)
+        end
+        rpc = get_peer_config(id)
+        unless rpc.nil?
+          rpc.setNamespaces(ns_set)
+          @replication_admin.updatePeerConfig(id, rpc)
+        end
+      end
+    end
+
+    # Show the current namespaces config for the specified peer
+    def show_peer_namespaces(peer_config)
+      namespaces = peer_config.get_namespaces
+      if !namespaces.nil?
+        return namespaces.join(';')
+      else
+        return nil
+      end
+    end
+
     
#----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)
       tableName = TableName.valueOf(table_name)
       @replication_admin.enableTableRep(tableName)
     end
+
     
#----------------------------------------------------------------------------------------------
     # Disables a table's replication switch
     def disable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/hbase_constants.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb 
b/hbase-shell/src/main/ruby/hbase_constants.rb
index bc6f37c..c02d5c6 100644
--- a/hbase-shell/src/main/ruby/hbase_constants.rb
+++ b/hbase-shell/src/main/ruby/hbase_constants.rb
@@ -78,6 +78,7 @@ module HBaseConstants
   ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'
   CLUSTER_KEY = 'CLUSTER_KEY'
   TABLE_CFS = 'TABLE_CFS'
+  NAMESPACES = 'NAMESPACES'
   CONFIG = 'CONFIG'
   DATA = 'DATA'
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb 
b/hbase-shell/src/main/ruby/shell.rb
index bb6a604..ee508e9 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -370,6 +370,7 @@ Shell.load_command_group(
     list_peers
     enable_peer
     disable_peer
+    set_peer_namespaces
     show_peer_tableCFs
     set_peer_tableCFs
     list_replicated_tables

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb 
b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
index e9431cf..077bd69 100644
--- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
@@ -27,13 +27,25 @@ must be specified to identify the peer.
 
 For a HBase cluster peer, a cluster key must be provided and is composed like 
this:
 
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-This gives a full path for HBase to connect to another HBase cluster. An 
optional parameter for
-table column families identifies which column families will be replicated to 
the peer cluster.
+This gives a full path for HBase to connect to another HBase cluster.
+An optional parameter for namespaces identifies which namespace's tables will 
be replicated
+to the peer cluster.
+An optional parameter for table column families identifies which tables and/or 
column families
+will be replicated to the peer cluster.
+
+Notice: Set a namespace in the peer config means that all tables in this 
namespace
+will be replicated to the peer cluster. So if you already have set a namespace 
in peer config,
+then you can't set this namespace's tables in the peer config again.
+
 Examples:
 
   hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
   hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
     TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", 
"cf2"] }
+  hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+    NAMESPACES => ["ns1", "ns2", "ns3"]
+  hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+    NAMESPACES => ["ns1", "ns2"], TABLE_CFS => { "ns3:table1" => [], 
"ns3:table2" => ["cf1"] }
 
 For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two 
optional arguments
 are DATA and CONFIG which can be specified to set different either the 
peer_data or configuration

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb 
b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 72a0704..ed6b575 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -32,12 +32,15 @@ EOF
       def command()
         peers = replication_admin.list_peers
 
-        formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"])
+        formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
+          "STATE", "NAMESPACES", "TABLE_CFS"])
 
         peers.entrySet().each do |e|
           state = replication_admin.get_peer_state(e.key)
+          namespaces = replication_admin.show_peer_namespaces(e.value)
           tableCFs = replication_admin.show_peer_tableCFs(e.key)
-          formatter.row([ e.key, e.value, state, tableCFs ])
+          formatter.row([ e.key, e.value.getClusterKey,
+            e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ])
         end
 
         formatter.footer()

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb 
b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
new file mode 100644
index 0000000..75b3d11
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
@@ -0,0 +1,51 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetPeerNamespaces< Command
+      def help
+        return <<-EOF
+  Set the replicable namespaces config for the specified peer.
+
+  Set a namespace in the peer config means that all tables in this
+  namespace will be replicated to the peer cluster. So if you already
+  have set a namespace in the peer config, then you can't set this
+  namespace's tables in the peer config again.
+
+  Examples:
+
+    # set namespaces config is null, then the table-cfs config decide
+    # which table to be replicated.
+    hbase> set_peer_namespaces '1', []
+    # set namespaces to be replicable for a peer.
+    # set a namespace in the peer config means that all tables in this
+    # namespace (with replication_scope != 0 ) will be replicated.
+    hbase> set_peer_namespaces '2', ["ns1", "ns2"]
+
+  EOF
+      end
+
+      def command(id, namespaces)
+        replication_admin.set_peer_namespaces(id, namespaces)
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb 
b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
index b2e823c..4d3c3ec 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
@@ -23,11 +23,15 @@ module Shell
     class SetPeerTableCFs< Command
       def help
         return <<-EOF
-  Set the replicable table-cf config for the specified peer
+  Set the replicable table-cf config for the specified peer.
+
+  Can't set a table to table-cfs config if it's namespace already was in
+  namespaces config of this peer.
+
   Examples:
 
-    # set all tables to be replicable for a peer
-    hbase> set_peer_tableCFs '1', ""
+    # set table-cfs config is null, then the namespaces config decide which
+    # table to be replicated.
     hbase> set_peer_tableCFs '1'
     # set table / table-cf to be replicable for a peer, for a table without
     # an explicit column-family list, all replicable column-families (with

http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb 
b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 1d27e67..daa8f96 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -121,6 +121,49 @@ module Hbase
       command(:remove_peer, @peer_id)
     end
 
+    define_test "add_peer: multiple zk cluster key and namespaces" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      namespaces = ["ns1", "ns2", "ns3"]
+      namespaces_str = "ns2;ns1;ns3"
+
+      args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces }
+      command(:add_peer, @peer_id, args)
+
+      assert_equal(1, command(:list_peers).length)
+      assert(command(:list_peers).key?(@peer_id))
+      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(cluster_key, peer_config.get_cluster_key)
+      assert_equal(namespaces_str,
+        replication_admin.show_peer_namespaces(peer_config))
+
+      # cleanup for future tests
+      command(:remove_peer, @peer_id)
+    end
+
+    define_test "add_peer: multiple zk cluster key and namespaces, table_cfs" 
do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      namespaces = ["ns1", "ns2"]
+      table_cfs = { "ns3:table1" => [], "ns3:table2" => ["cf1"],
+        "ns3:table3" => ["cf1", "cf2"] }
+      namespaces_str = "ns2;ns1"
+      table_cfs_str = "ns3.table1;ns3.table3:cf1,cf2;ns3.table2:cf1"
+
+      args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces,
+        TABLE_CFS => table_cfs }
+      command(:add_peer, @peer_id, args)
+
+      assert_equal(1, command(:list_peers).length)
+      assert(command(:list_peers).key?(@peer_id))
+      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(cluster_key, peer_config.get_cluster_key)
+      assert_equal(namespaces_str,
+        replication_admin.show_peer_namespaces(peer_config))
+      assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id))
+
+      # cleanup for future tests
+      command(:remove_peer, @peer_id)
+    end
+
     define_test "add_peer: multiple zk cluster key and table_cfs - peer 
config" do
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
       table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", 
"cf2"] }
@@ -152,6 +195,30 @@ module Hbase
       end
     end
 
+    define_test "set_peer_namespaces: works with namespaces array" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      namespaces = ["ns1", "ns2"]
+      namespaces_str = "ns2;ns1"
+
+      args = { CLUSTER_KEY => cluster_key }
+      command(:add_peer, @peer_id, args)
+
+      # Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added
+      # but here we have to do it ourselves
+      replication_admin.peer_added(@peer_id)
+
+      command(:set_peer_namespaces, @peer_id, namespaces)
+
+      assert_equal(1, command(:list_peers).length)
+      assert(command(:list_peers).key?(@peer_id))
+      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(namespaces_str,
+        replication_admin.show_peer_namespaces(peer_config))
+
+      # cleanup for future tests
+      command(:remove_peer, @peer_id)
+    end
+
     define_test "get_peer_config: works with simple clusterKey peer" do
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }
@@ -221,8 +288,8 @@ module Hbase
       assert_equal("value2", peer_config.get_configuration.get("config2"))
       assert_equal("new_value1", 
Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1"))))
       assert_equal("value2", 
Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
-
     end
+
     # assert_raise fails on native exceptions - 
https://jira.codehaus.org/browse/JRUBY-5279
     # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in 
the test below.
     # define_test "add_peer: adding a second peer with same id should error" do

Reply via email to