HBASE-16086 TableCfWALEntryFilter and ScopeWALEntryFilter should not redundantly iterate over cells (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/80d8b210 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/80d8b210 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/80d8b210 Branch: refs/heads/hbase-12439 Commit: 80d8b2100d9f4dc2a01ea6bdbded6ec52d7e4263 Parents: cc2a40a Author: chenheng <chenh...@apache.org> Authored: Sun Sep 11 09:55:08 2016 +0800 Committer: chenheng <chenh...@apache.org> Committed: Sun Sep 11 09:55:08 2016 +0800 ---------------------------------------------------------------------- .../hbase/replication/BulkLoadCellFilter.java | 81 ++++++++++++ .../hbase/replication/ChainWALEntryFilter.java | 38 +++++- .../hbase/replication/ScopeWALEntryFilter.java | 94 ++++---------- .../replication/TableCfWALEntryFilter.java | 124 +++++++------------ .../hadoop/hbase/replication/WALCellFilter.java | 41 ++++++ .../TestReplicationWALEntryFilters.java | 12 +- 6 files changed, 231 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java new file mode 100644 index 0000000..3599d10 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +import com.google.common.base.Predicate; + +public class BulkLoadCellFilter { + private static final Log LOG = LogFactory.getLog(BulkLoadCellFilter.class); + + /** + * Filters the bulk load cell using the supplied predicate. + * @param cell The WAL cell to filter. + * @param famPredicate Returns true of given family should be removed. + * @return The filtered cell. + */ + public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) { + byte[] fam; + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(cell); + } catch (IOException e) { + LOG.warn("Failed to get bulk load events information from the WAL file.", e); + return cell; + } + List<StoreDescriptor> storesList = bld.getStoresList(); + // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList + List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList); + Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); + boolean anyStoreRemoved = false; + while (copiedStoresListIterator.hasNext()) { + StoreDescriptor sd = copiedStoresListIterator.next(); + fam = sd.getFamilyName().toByteArray(); + if (famPredicate.apply(fam)) { + copiedStoresListIterator.remove(); + anyStoreRemoved = true; + } + } + + if (!anyStoreRemoved) { + return cell; + } else if (copiedStoresList.isEmpty()) { + return null; + } + BulkLoadDescriptor.Builder newDesc = + BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) + .setEncodedRegionName(bld.getEncodedRegionName()) + .setBulkloadSeqNum(bld.getBulkloadSeqNum()); + newDesc.addAllStores(copiedStoresList); + BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); + return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, + cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 6a3981a..1d67faa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -34,9 +35,11 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; public class ChainWALEntryFilter implements WALEntryFilter { private final WALEntryFilter[] filters; + private WALCellFilter[] cellFilters; public ChainWALEntryFilter(WALEntryFilter...filters) { this.filters = filters; + initCellFilters(); } public ChainWALEntryFilter(List<WALEntryFilter> filters) { @@ -49,8 +52,18 @@ public class ChainWALEntryFilter implements WALEntryFilter { rawFilters.add(filter); } } - this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]); + initCellFilters(); + } + + public void initCellFilters() { + ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length); + for (WALEntryFilter filter : filters) { + if (filter instanceof WALCellFilter) { + cellFilters.add((WALCellFilter) filter); + } + } + this.cellFilters = cellFilters.toArray(new WALCellFilter[cellFilters.size()]); } @Override @@ -61,7 +74,30 @@ public class ChainWALEntryFilter implements WALEntryFilter { } entry = filter.filter(entry); } + filterCells(entry); return entry; } + private void filterCells(Entry entry) { + if (entry == null || cellFilters.length == 0) { + return; + } + ArrayList<Cell> cells = entry.getEdit().getCells(); + int size = cells.size(); + for (int i = size - 1; i >= 0; i--) { + Cell cell = cells.get(i); + for (WALCellFilter filter : cellFilters) { + cell = filter.filterCell(entry, cell); + if (cell != null) { + cells.set(i, cell); + } else { + cells.remove(i); + break; + } + } + } + if (cells.size() < size / 2) { + cells.trimToSize(); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 28a83dd..b084a04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -18,29 +18,24 @@ package org.apache.hadoop.hbase.replication; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.NavigableMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL.Entry; +import com.google.common.base.Predicate; + /** * Keeps KVs that are scoped other than local */ @InterfaceAudience.Private -public class ScopeWALEntryFilter implements WALEntryFilter { - private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class); +public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { + + BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); @Override public Entry filter(Entry entry) { @@ -48,72 +43,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter { if (scopes == null || scopes.isEmpty()) { return null; } - ArrayList<Cell> cells = entry.getEdit().getCells(); - int size = cells.size(); - byte[] fam; - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - // If a bulk load entry has a scope then that means user has enabled replication for bulk load - // hfiles. - // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so - // cannot refactor into one now, can revisit and see if any way to unify them. + return entry; + } + + @Override + public Cell filterCell(Entry entry, Cell cell) { + final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes(); + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + byte[] fam = CellUtil.cloneFamily(cell); if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell); - if (filteredBulkLoadEntryCell != null) { - cells.set(i, filteredBulkLoadEntryCell); - } else { - cells.remove(i); - } + cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() { + @Override + public boolean apply(byte[] fam) { + return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL; + } + }); } else { - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - fam = CellUtil.cloneFamily(cell); if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - cells.remove(i); + return null; } } - } - if (cells.size() < size / 2) { - cells.trimToSize(); - } - return entry; - } - - private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) { - byte[] fam; - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(cell); - } catch (IOException e) { - LOG.warn("Failed to get bulk load events information from the WAL file.", e); - return cell; - } - List<StoreDescriptor> storesList = bld.getStoresList(); - // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList - List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList); - Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); - boolean anyStoreRemoved = false; - while (copiedStoresListIterator.hasNext()) { - StoreDescriptor sd = copiedStoresListIterator.next(); - fam = sd.getFamilyName().toByteArray(); - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - copiedStoresListIterator.remove(); - anyStoreRemoved = true; - } - } - - if (!anyStoreRemoved) { - return cell; - } else if (copiedStoresList.isEmpty()) { - return null; - } - BulkLoadDescriptor.Builder newDesc = - BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) - .setEncodedRegionName(bld.getEncodedRegionName()) - .setBulkloadSeqNum(bld.getBulkloadSeqNum()); - newDesc.addAllStores(copiedStoresList); - BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); - return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, - cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + return cell; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java index f10849b..d890e3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hbase.replication; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,16 +26,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; -public class TableCfWALEntryFilter implements WALEntryFilter { +import com.google.common.base.Predicate; + +public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class); - private final ReplicationPeer peer; + private ReplicationPeer peer; + private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); public TableCfWALEntryFilter(ReplicationPeer peer) { this.peer = peer; @@ -47,91 +45,57 @@ public class TableCfWALEntryFilter implements WALEntryFilter { @Override public Entry filter(Entry entry) { TableName tabName = entry.getKey().getTablename(); - ArrayList<Cell> cells = entry.getEdit().getCells(); - Map<TableName, List<String>> tableCFs = null; - - try { - tableCFs = this.peer.getTableCFs(); - } catch (IllegalArgumentException e) { - LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + - ", degenerate as if it's not configured by keeping tableCFs==null"); - } - int size = cells.size(); + Map<TableName, List<String>> tableCFs = getTableCfs(); // If null means user has explicitly not configured any table CFs so all the tables data are // applicable for replication - if (tableCFs == null) { - return entry; - } - // return null(prevent replicating) if logKey's table isn't in this peer's - // replicable table list + if (tableCFs == null) return entry; + if (!tableCFs.containsKey(tabName)) { return null; - } else { - List<String> cfs = tableCFs.get(tabName); - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so - // cannot refactor into one now, can revisit and see if any way to unify them. - // Filter bulk load entries separately - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell); - if (filteredBulkLoadEntryCell != null) { - cells.set(i, filteredBulkLoadEntryCell); - } else { - cells.remove(i); - } - } else { - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(), - cell.getFamilyOffset(), cell.getFamilyLength()))) { - cells.remove(i); - } - } - } - } - if (cells.size() < size/2) { - cells.trimToSize(); } + return entry; } - private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) { - byte[] fam; - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(cell); - } catch (IOException e) { - LOG.warn("Failed to get bulk load events information from the WAL file.", e); - return cell; - } - List<StoreDescriptor> storesList = bld.getStoresList(); - // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList - List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList); - Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); - boolean anyStoreRemoved = false; - while (copiedStoresListIterator.hasNext()) { - StoreDescriptor sd = copiedStoresListIterator.next(); - fam = sd.getFamilyName().toByteArray(); - if (cfs != null && !cfs.contains(Bytes.toString(fam))) { - copiedStoresListIterator.remove(); - anyStoreRemoved = true; + @Override + public Cell filterCell(final Entry entry, Cell cell) { + final Map<TableName, List<String>> tableCfs = getTableCfs(); + if (tableCfs == null) return cell; + TableName tabName = entry.getKey().getTablename(); + List<String> cfs = tableCfs.get(tabName); + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() { + @Override + public boolean apply(byte[] fam) { + if (tableCfs != null) { + List<String> cfs = tableCfs.get(entry.getKey().getTablename()); + if (cfs != null && !cfs.contains(Bytes.toString(fam))) { + return true; + } + } + return false; + } + }); + } else { + if ((cfs != null) && !cfs.contains( + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { + return null; } } + return cell; + } - if (!anyStoreRemoved) { - return cell; - } else if (copiedStoresList.isEmpty()) { - return null; + Map<TableName, List<String>> getTableCfs() { + Map<TableName, List<String>> tableCFs = null; + try { + tableCFs = this.peer.getTableCFs(); + } catch (IllegalArgumentException e) { + LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + + ", degenerate as if it's not configured by keeping tableCFs==null"); } - BulkLoadDescriptor.Builder newDesc = - BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) - .setEncodedRegionName(bld.getEncodedRegionName()) - .setBulkloadSeqNum(bld.getBulkloadSeqNum()); - newDesc.addAllStores(copiedStoresList); - BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); - return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, - cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + return tableCFs; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java new file mode 100644 index 0000000..78b3ed4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +/** + * A filter for WAL entry cells before being sent over to replication. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface WALCellFilter { + + /** + * Applies the filter, possibly returning a different Cell instance. + * If null is returned, the cell will be skipped. + * @param entry Entry which contains the cell + * @param cell Cell to filter + * @return a (possibly modified) Cell to use. Returning null will cause the cell + * to be skipped for replication. + */ + public Cell filterCell(Entry entry, Cell cell); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index c906d6a..04d9232 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -78,7 +78,7 @@ public class TestReplicationWALEntryFilters { @Test public void testScopeWALEntryFilter() { - ScopeWALEntryFilter filter = new ScopeWALEntryFilter(); + WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter()); Entry userEntry = createEntry(null, a, b); Entry userEntryA = createEntry(null, a); @@ -201,14 +201,14 @@ public class TestReplicationWALEntryFilters { when(peer.getTableCFs()).thenReturn(null); Entry userEntry = createEntry(null, a, b, c); - TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer); + WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); // empty map userEntry = createEntry(null, a, b, c); Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // table bar @@ -216,7 +216,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<TableName, List<String>>(); tableCfs.put(TableName.valueOf("bar"), null); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // table foo:a @@ -224,7 +224,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<TableName, List<String>>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a), filter.filter(userEntry)); // table foo:a,c @@ -232,7 +232,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap<TableName, List<String>>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,c), filter.filter(userEntry)); }