HBASE-19904 Break dependency of WAL constructor on Replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc6d140a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc6d140a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc6d140a Branch: refs/heads/HBASE-19064 Commit: fc6d140adf0b382e0b7bfef02ae96be7908036e1 Parents: a2bc19a Author: zhangduo <zhang...@apache.org> Authored: Fri Feb 2 14:10:29 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Feb 2 14:10:29 2018 +0800 ---------------------------------------------------------------------- .../hbase/mapreduce/TestWALRecordReader.java | 4 +- .../hbase/replication/ReplicationUtils.java | 10 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 27 ++- .../hadoop/hbase/regionserver/HRegion.java | 49 +++--- .../hbase/regionserver/HRegionServer.java | 108 +++++------- .../hbase/regionserver/ReplicationService.java | 16 +- .../regionserver/ReplicationSourceService.java | 7 - .../replication/regionserver/Replication.java | 176 ++++--------------- .../regionserver/ReplicationSourceManager.java | 72 ++++++-- .../regionserver/ReplicationSyncUp.java | 3 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 6 +- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 43 +++-- .../hadoop/hbase/wal/DisabledWALProvider.java | 8 +- .../apache/hadoop/hbase/wal/FSHLogProvider.java | 10 +- .../hbase/wal/RegionGroupingProvider.java | 35 ++-- .../org/apache/hadoop/hbase/wal/WALFactory.java | 42 ++--- .../apache/hadoop/hbase/wal/WALProvider.java | 46 +++-- .../hadoop/hbase/HBaseTestingUtility.java | 6 +- .../hbase/coprocessor/TestWALObserver.java | 7 +- .../hbase/master/cleaner/TestLogsCleaner.java | 4 +- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../regionserver/TestCacheOnWriteInSchema.java | 2 +- .../TestCompactionArchiveConcurrentClose.java | 2 +- .../TestCompactionArchiveIOException.java | 2 +- .../hbase/regionserver/TestDefaultMemStore.java | 2 +- .../hbase/regionserver/TestHMobStore.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 23 +-- .../hadoop/hbase/regionserver/TestHStore.java | 2 +- .../TestStoreFileRefresherChore.java | 2 +- .../TestWALMonotonicallyIncreasingSeqId.java | 2 +- .../wal/AbstractTestLogRolling.java | 6 +- .../wal/AbstractTestProtobufLog.java | 3 +- .../regionserver/wal/AbstractTestWALReplay.java | 11 +- .../hbase/regionserver/wal/TestDurability.java | 6 +- .../regionserver/wal/TestLogRollAbort.java | 2 +- .../wal/TestLogRollingNoCluster.java | 2 +- .../wal/TestWALActionsListener.java | 7 +- .../TestReplicationEmptyWALRecovery.java | 6 +- .../replication/TestReplicationSmallTests.java | 18 -- .../TestReplicationSourceManager.java | 51 +++++- .../regionserver/TestWALEntryStream.java | 7 +- .../apache/hadoop/hbase/wal/IOTestProvider.java | 69 ++++++-- .../wal/TestBoundedRegionGroupingStrategy.java | 2 +- .../hadoop/hbase/wal/TestFSHLogProvider.java | 6 +- .../apache/hadoop/hbase/wal/TestSecureWAL.java | 2 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +- .../apache/hadoop/hbase/wal/TestWALMethods.java | 2 +- .../hbase/wal/TestWALReaderOnSecureWAL.java | 7 +- .../apache/hadoop/hbase/wal/TestWALRootDir.java | 2 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 4 +- .../hbase/wal/WALPerformanceEvaluation.java | 2 +- 51 files changed, 465 insertions(+), 474 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 87d100b..e486714 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -127,7 +127,7 @@ public class TestWALRecordReader { */ @Test public void testPartialRead() throws Exception { - final WALFactory walfactory = new WALFactory(conf, null, getName()); + final WALFactory walfactory = new WALFactory(conf, getName()); WAL log = walfactory.getWAL(info); // This test depends on timestamp being millisecond based and the filename of the WAL also // being millisecond based. @@ -186,7 +186,7 @@ public class TestWALRecordReader { */ @Test public void testWALRecordReader() throws Exception { - final WALFactory walfactory = new WALFactory(conf, null, getName()); + final WALFactory walfactory = new WALFactory(conf, getName()); WAL log = walfactory.getWAL(info); byte [] value = Bytes.toBytes("value"); WALEdit edit = new WALEdit(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index ebe68a7..11507aa 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -122,4 +123,13 @@ public final class ReplicationUtils { isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); } } + + /** + * @param c Configuration to look at + * @return True if replication for bulk load data is enabled. + */ + public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { + return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index d422960..dc1763c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; + import com.google.protobuf.Descriptors; import com.google.protobuf.Service; import java.io.IOException; @@ -166,8 +168,10 @@ import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -484,7 +488,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Disable usage of meta replicas in the master this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); - Replication.decorateMasterConfiguration(this.conf); + decorateMasterConfiguration(this.conf); // Hack! Maps DFSClient => Master for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. @@ -3557,4 +3561,23 @@ public class HMaster extends HRegionServer implements MasterServices { public ReplicationPeerManager getReplicationPeerManager() { return replicationPeerManager; } + + /** + * This method modifies the master's configuration in order to inject replication-related features + */ + @VisibleForTesting + public static void decorateMasterConfiguration(Configuration conf) { + String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); + String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); + if (!plugins.contains(cleanerClass)) { + conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); + } + if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { + plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); + cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); + if (!plugins.contains(cleanerClass)) { + conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ecc9a74..7a6af75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -69,7 +71,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -148,28 +149,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; -import org.apache.hbase.thirdparty.com.google.protobuf.Service; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -200,7 +179,29 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.umd.cs.findbugs.annotations.Nullable; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.Service; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** * Regions store data for a certain region of a table. It stores all columns http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3844415..cb7e2d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -131,10 +131,9 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -158,6 +157,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -546,7 +546,7 @@ public class HRegionServer extends HasThread implements checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); - Replication.decorateRegionServerConfiguration(this.conf); + decorateRegionServerConfiguration(this.conf); // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); @@ -1781,52 +1781,26 @@ public class HRegionServer extends HasThread implements } /** - * Setup WAL log and replication if enabled. - * Replication setup is done in here because it wants to be hooked up to WAL. - * - * @throws IOException + * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to + * be hooked up to WAL. */ private void setupWALAndReplication() throws IOException { + WALFactory factory = new WALFactory(conf, serverName.toString()); + // TODO Replication make assumptions here based on the default filesystem impl Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); Path logDir = new Path(walRootDir, logName); - if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); + LOG.debug("logDir={}", logDir); if (this.walFs.exists(logDir)) { - throw new RegionServerRunningException("Region server has already " + - "created directory at " + this.serverName.toString()); + throw new RegionServerRunningException( + "Region server has already created directory at " + this.serverName.toString()); } - - // Instantiate replication if replication enabled. Pass it the log directories. - // In here we create the Replication instances. Later they are initialized and started up. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); - - // listeners the wal factory will add to wals it creates. - List<WALActionsListener> listeners = new ArrayList<>(); - listeners.add(new MetricsWAL()); - if (this.replicationSourceHandler != null && - this.replicationSourceHandler.getWALActionsListener() != null) { - // Replication handler is an implementation of WALActionsListener. - listeners.add(this.replicationSourceHandler.getWALActionsListener()); - } - - // There is a cyclic dependency between ReplicationSourceHandler and WALFactory. - // We use WALActionsListener to get the newly rolled WALs, so we need to get the - // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then - // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written. - // So we here we need to construct WALFactory first, and then pass it to the initialized method - // of ReplicationSourceHandler. - // TODO: I can't follow replication; it has initialize and then later on we start it! - WALFactory factory = new WALFactory(conf, listeners, serverName.toString()); + // Instantiate replication if replication enabled. Pass it the log directories. + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, + factory.getWALProvider()); this.walFactory = factory; - if (this.replicationSourceHandler != null) { - this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory); - } - if (this.replicationSinkHandler != null && - this.replicationSinkHandler != this.replicationSourceHandler) { - this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory); - } } /** @@ -2918,15 +2892,13 @@ public class HRegionServer extends HasThread implements // // Main program and support routines // - /** * Load the replication executorService objects, if any */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, - FileSystem walFs, Path walDir, Path oldWALDir) throws IOException { - - if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || - LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { + FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { + if ((server instanceof HMaster) && + (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { return; } @@ -2941,32 +2913,30 @@ public class HRegionServer extends HasThread implements // If both the sink and the source class names are the same, then instantiate // only one object. if (sourceClassname.equals(sinkClassname)) { - server.replicationSourceHandler = - (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, - walDir, oldWALDir); + server.replicationSourceHandler = newReplicationInstance(sourceClassname, + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { - server.replicationSourceHandler = - (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, - walDir, oldWALDir); - server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname, - conf, server, walFs, walDir, oldWALDir); + server.replicationSourceHandler = newReplicationInstance(sourceClassname, + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + server.replicationSinkHandler = newReplicationInstance(sinkClassname, + ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); } } - private static ReplicationService newReplicationInstance(String classname, Configuration conf, - HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException { - Class<? extends ReplicationService> clazz = null; + private static <T extends ReplicationService> T newReplicationInstance(String classname, + Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, + Path oldLogDir, WALProvider walProvider) throws IOException { + Class<? extends T> clazz = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class); + clazz = Class.forName(classname, true, classLoader).asSubclass(xface); } catch (java.lang.ClassNotFoundException nfe) { throw new IOException("Could not find class for " + classname); } - - // create an instance of the replication object, but do not initialize it here as we need to use - // WALFactory when initializing. - return ReflectionUtils.newInstance(clazz, conf); + T service = ReflectionUtils.newInstance(clazz, conf); + service.initialize(server, walFs, logDir, oldLogDir, walProvider); + return service; } /** @@ -3739,4 +3709,20 @@ public class HRegionServer extends HasThread implements throw ProtobufUtil.getRemoteException(se); } } + + /** + * This method modifies the region server's configuration in order to inject replication-related + * features + * @param conf region server configurations + */ + static void decorateRegionServerConfiguration(Configuration conf) { + if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { + String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, ""); + String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); + if (!plugins.contains(rsCoprocessorClass)) { + conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + plugins + "," + rsCoprocessorClass); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index f3bc188..c34231d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,13 +18,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; /** * Gateway to Cluster Replication. Used by @@ -37,14 +35,14 @@ public interface ReplicationService { /** * Initializes the replication service object. - * @throws IOException + * @param walProvider can be null if not initialized inside a live region server environment, for + * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, - WALFileLengthProvider walFileLengthProvider) throws IOException; + void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) + throws IOException; /** * Start replication services. - * @throws IOException */ void startReplicationService() throws IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 2aef0a8..23ba773 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; /** * A source for a replication stream has to expose this service. @@ -28,12 +27,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; */ @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { - /** - * Returns a WALObserver for the service. This is needed to - * observe log rolls and log archival events. - */ - WALActionsListener getWALActionsListener(); - /** * Returns a Handler to handle peer procedures. http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 2fa5a9b..aaf3beb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -30,14 +28,10 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -46,12 +40,11 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; -import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -61,16 +54,15 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + /** - * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. + * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @InterfaceAudience.Private -public class Replication implements - ReplicationSourceService, ReplicationSinkService, WALActionsListener { +public class Replication implements ReplicationSourceService, ReplicationSinkService { private static final Logger LOG = LoggerFactory.getLogger(Replication.class); - private boolean replicationForBulkLoadData; + private boolean isReplicationForBulkLoadDataEnabled; private ReplicationSourceManager replicationManager; private ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; @@ -88,18 +80,6 @@ public class Replication implements private PeerProcedureHandler peerProcedureHandler; /** - * Instantiate the replication management (if rep is enabled). - * @param server Hosting server - * @param fs handle to the filesystem - * @param logDir - * @param oldLogDir directory where logs are archived - * @throws IOException - */ - public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException { - initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty()); - } - - /** * Empty constructor */ public Replication() { @@ -107,16 +87,17 @@ public class Replication implements @Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, - WALFileLengthProvider walFileLengthProvider) throws IOException { + WALProvider walProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); - this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf); + this.isReplicationForBulkLoadDataEnabled = + ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); - if (this.replicationForBulkLoadData) { + if (this.isReplicationForBulkLoadDataEnabled) { if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID @@ -142,9 +123,28 @@ public class Replication implements } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } - this.replicationManager = - new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, - this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider); + this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, + replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, + walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); + if (walProvider != null) { + walProvider.addWALActionsListener(new WALActionsListener() { + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + replicationManager.preLogRoll(newPath); + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + replicationManager.postLogRoll(newPath); + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + replicationManager.scopeWALEdits(logKey, logEdit); + } + }); + } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); @@ -153,23 +153,6 @@ public class Replication implements this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); } - /** - * @param c Configuration to look at - * @return True if replication for bulk load data is enabled. - */ - public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { - return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - } - - /* - * Returns an object to listen to new wal changes - **/ - @Override - public WALActionsListener getWALActionsListener() { - return this; - } - @Override public PeerProcedureHandler getPeerProcedureHandler() { return peerProcedureHandler; @@ -225,7 +208,7 @@ public class Replication implements this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); this.scheduleThreadPool.scheduleAtFixedRate( - new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), + new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); } @@ -237,45 +220,6 @@ public class Replication implements return this.replicationManager; } - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); - } - - /** - * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from - * compaction WAL edits and if the scope is local. - * @param logKey Key that may get scoped according to its edits - * @param logEdit Edits used to lookup the scopes - * @param replicationManager Manager used to add bulk load events hfile references - * @throws IOException If failed to parse the WALEdit - */ - public static void scopeWALEdits(WALKey logKey, - WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) - throws IOException { - boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); - boolean foundOtherEdits = false; - for (Cell cell : logEdit.getCells()) { - if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - foundOtherEdits = true; - break; - } - } - - if (!foundOtherEdits && logEdit.getCells().size() > 0) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); - if (maybeEvent != null && (maybeEvent.getEventType() == - WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { - // In serially replication, we use scopes when reading close marker. - foundOtherEdits = true; - } - } - if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { - ((WALKeyImpl)logKey).serializeReplicationScope(false); - } - } - void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws IOException { try { @@ -286,62 +230,16 @@ public class Replication implements } } - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - getReplicationManager().preLogRoll(newPath); - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - getReplicationManager().postLogRoll(newPath); - } - /** - * This method modifies the master's configuration in order to inject replication-related features - * @param conf - */ - public static void decorateMasterConfiguration(Configuration conf) { - String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); - String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); - if (!plugins.contains(cleanerClass)) { - conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); - } - if (isReplicationForBulkLoadDataEnabled(conf)) { - plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); - cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); - if (!plugins.contains(cleanerClass)) { - conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); - } - } - } - - /** - * This method modifies the region server's configuration in order to inject replication-related - * features - * @param conf region server configurations - */ - public static void decorateRegionServerConfiguration(Configuration conf) { - if (isReplicationForBulkLoadDataEnabled(conf)) { - String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, ""); - String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); - if (!plugins.contains(rsCoprocessorClass)) { - conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, - plugins + "," + rsCoprocessorClass); - } - } - } - - /* - * Statistics thread. Periodically prints the cache statistics to the log. + * Statistics task. Periodically prints the cache statistics to the log. */ - static class ReplicationStatisticsThread extends Thread { + private final static class ReplicationStatisticsTask implements Runnable { private final ReplicationSink replicationSink; private final ReplicationSourceManager replicationManager; - public ReplicationStatisticsThread(final ReplicationSink replicationSink, - final ReplicationSourceManager replicationManager) { - super("ReplicationStatisticsThread"); + public ReplicationStatisticsTask(ReplicationSink replicationSink, + ReplicationSourceManager replicationManager) { this.replicationManager = replicationManager; this.replicationSink = replicationSink; } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ab86d7c..2147214 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -43,6 +43,8 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; @@ -57,9 +59,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +73,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + /** * This class is responsible to manage all the replication sources. There are two classes of * sources: @@ -86,14 +94,15 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * operations.</li> * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, * {@link #addPeer(String)}, {@link #removePeer(String)}, - * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById} - * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So - * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}. - * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}. - * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the - * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no - * race with {@link #removePeer(String)}. The only case need synchronized is - * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.</li> + * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. + * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in + * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and + * {@link #removePeer(String)}. {@link #cleanOldLogs(SortedSet, String, String)} is called by + * {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. + * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then + * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only + * case need synchronized is {@link #cleanOldLogs(SortedSet, String, String)} and + * {@link #preLogRoll(Path)}.</li> * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. @@ -533,7 +542,9 @@ public class ReplicationSourceManager implements ReplicationListener { walSet.clear(); } - void preLogRoll(Path newLog) throws IOException { + // public because of we call it in TestReplicationEmptyWALRecovery + @VisibleForTesting + public void preLogRoll(Path newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // synchronized on latestPaths to avoid the new open source miss the new log @@ -588,13 +599,52 @@ public class ReplicationSourceManager implements ReplicationListener { } } - void postLogRoll(Path newLog) throws IOException { + // public because of we call it in TestReplicationEmptyWALRecovery + @VisibleForTesting + public void postLogRoll(Path newLog) throws IOException { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); } } + void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException { + scopeWALEdits(logKey, logEdit, this.conf); + } + + /** + * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from + * compaction WAL edits and if the scope is local. + * @param logKey Key that may get scoped according to its edits + * @param logEdit Edits used to lookup the scopes + * @throws IOException If failed to parse the WALEdit + */ + @VisibleForTesting + static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { + boolean replicationForBulkLoadEnabled = + ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); + boolean foundOtherEdits = false; + for (Cell cell : logEdit.getCells()) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + foundOtherEdits = true; + break; + } + } + + if (!foundOtherEdits && logEdit.getCells().size() > 0) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); + if (maybeEvent != null && + (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { + // In serially replication, we use scopes when reading close marker. + foundOtherEdits = true; + } + } + if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { + ((WALKeyImpl) logKey).serializeReplicationScope(false); + } + } + @Override public void regionServerRemoved(String regionserver) { transferQueues(ServerName.valueOf(regionserver)); @@ -886,7 +936,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId) throws IOException, InterruptedException { - /** * There are barriers for this region and position for this peer. N barriers form N intervals, * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than @@ -974,5 +1023,4 @@ public class ReplicationSourceManager implements ReplicationListener { Thread.sleep(replicationWaitTime); } } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 9ec244a..01a230d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -99,7 +99,8 @@ public class ReplicationSyncUp extends Configured implements Tool { logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); System.out.println("Start Replication Server start"); - replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir); + replication = new Replication(); + replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); manager = replication.getReplicationManager(); manager.init().get(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index b4a22e4..361bb51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -114,8 +114,6 @@ import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; @@ -1482,9 +1480,7 @@ public class HBaseFsck extends Configured implements Closeable { // unless I pass along via the conf. Configuration confForWAL = new Configuration(c); confForWAL.set(HConstants.HBASE_DIR, rootdir.toString()); - WAL wal = - new WALFactory(confForWAL, Collections.<WALActionsListener> singletonList(new MetricsWAL()), - walFactoryID).getWAL(metaHRI); + WAL wal = new WALFactory(confForWAL, walFactoryID).getWAL(metaHRI); HRegion meta = HRegion.createHRegion(metaHRI, rootdir, c, metaDescriptor, wal); MasterFileSystem.setInfoFamilyCachingForMeta(metaDescriptor, true); return meta; http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index d9badfa..231afd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -76,13 +76,13 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } protected volatile T wal; - protected WALFactory factory = null; - protected Configuration conf = null; - protected List<WALActionsListener> listeners = null; - protected String providerId = null; + protected WALFactory factory; + protected Configuration conf; + protected List<WALActionsListener> listeners = new ArrayList<>(); + protected String providerId; protected AtomicBoolean initialized = new AtomicBoolean(false); // for default wal provider, logPrefix won't change - protected String logPrefix = null; + protected String logPrefix; /** * we synchronized on walCreateLock to prevent wal recreation in different threads @@ -92,19 +92,16 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null - * @param listeners may be null * @param providerId differentiate between providers from one factory, used for FS layout. may be * null */ @Override - public void init(WALFactory factory, Configuration conf, List<WALActionsListener> listeners, - String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } this.factory = factory; this.conf = conf; - this.listeners = listeners; this.providerId = providerId; // get log prefix StringBuilder sb = new StringBuilder().append(factory.factoryId); @@ -249,8 +246,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for * description. */ - private static final Pattern pattern = Pattern - .compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*"); + private static final Pattern pattern = + Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*"); /** * A WAL file name is of the format: <wal-name>{@link #WAL_FILE_NAME_DELIMITER} @@ -264,8 +261,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } /** - * Construct the directory name for all WALs on a given server. Dir names currently look like - * this for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>. + * Construct the directory name for all WALs on a given server. Dir names currently look like this + * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>. * @param serverName Server name formatted as described in {@link ServerName} * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if * <code>serverName</code> passed is <code>1.example.org,60030,12345</code> @@ -278,9 +275,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } /** - * Construct the directory name for all old WALs on a given server. The default old WALs dir - * looks like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver - * to true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>. + * Construct the directory name for all old WALs on a given server. The default old WALs dir looks + * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to + * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>. * @param conf * @param serverName Server name formatted as described in {@link ServerName} * @return the relative WAL directory name @@ -372,7 +369,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } try { serverName = ServerName.parseServerName(logDirName); - } catch (IllegalArgumentException|IllegalStateException ex) { + } catch (IllegalArgumentException | IllegalStateException ex) { serverName = null; LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); } @@ -430,16 +427,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } /** - * Opens WAL reader with retries and - * additional exception handling + * Opens WAL reader with retries and additional exception handling * @param path path to WAL file * @param conf configuration * @return WAL Reader instance * @throws IOException */ - public static org.apache.hadoop.hbase.wal.WAL.Reader - openReader(Path path, Configuration conf) - throws IOException + public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) + throws IOException { long retryInterval = 2000; // 2 sec @@ -503,6 +498,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } } + @Override + public void addWALActionsListener(WALActionsListener listener) { + listeners.add(listener); + } /** * Get prefix of the log from its name, assuming WAL name in format of http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 2105490..1e750e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -55,8 +55,7 @@ class DisabledWALProvider implements WALProvider { WAL disabled; @Override - public void init(final WALFactory factory, final Configuration conf, - final List<WALActionsListener> listeners, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (null != disabled) { throw new IllegalStateException("WALProvider.init should only be called once."); } @@ -250,4 +249,9 @@ class DisabledWALProvider implements WALProvider { public long getLogFileSize() { return 0; } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + disabled.registerWALActionsListener(listener); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index f1662bc..b0a924f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -19,19 +19,17 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// imports for things that haven't moved from regionserver.wal yet. -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; -import org.apache.hadoop.hbase.util.CommonFSUtils; /** * A WAL provider that use {@link FSHLog}. http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index a0ef817..28817e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -23,11 +23,11 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; @@ -130,20 +130,18 @@ public class RegionGroupingProvider implements WALProvider { private final KeyLocker<String> createLock = new KeyLocker<>(); - private RegionGroupingStrategy strategy = null; - private WALFactory factory = null; - private List<WALActionsListener> listeners = null; - private String providerId = null; + private RegionGroupingStrategy strategy; + private WALFactory factory; + private List<WALActionsListener> listeners = new ArrayList<>(); + private String providerId; private Class<? extends WALProvider> providerClass; @Override - public void init(final WALFactory factory, final Configuration conf, - final List<WALActionsListener> listeners, final String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } this.factory = factory; - this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners); StringBuilder sb = new StringBuilder().append(factory.factoryId); if (providerId != null) { if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { @@ -159,19 +157,15 @@ public class RegionGroupingProvider implements WALProvider { private WALProvider createProvider(String group) throws IOException { if (META_WAL_PROVIDER_ID.equals(providerId)) { - return factory.createProvider(providerClass, listeners, META_WAL_PROVIDER_ID); + return factory.createProvider(providerClass, META_WAL_PROVIDER_ID); } else { - return factory.createProvider(providerClass, listeners, group); + return factory.createProvider(providerClass, group); } } @Override public List<WAL> getWALs() { - List<WAL> wals = new ArrayList<>(); - for (WALProvider provider : cached.values()) { - wals.addAll(provider.getWALs()); - } - return wals; + return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList()); } private WAL getWAL(String group) throws IOException { @@ -182,6 +176,7 @@ public class RegionGroupingProvider implements WALProvider { provider = cached.get(group); if (provider == null) { provider = createProvider(group); + listeners.forEach(provider::addWALActionsListener); cached.put(group, provider); } } finally { @@ -277,4 +272,14 @@ public class RegionGroupingProvider implements WALProvider { } return logFileSize; } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + // Notice that there is an assumption that this method must be called before the getWAL above, + // so we can make sure there is no sub WALProvider yet, so we only add the listener to our + // listeners list without calling addWALActionListener for each WALProvider. Although it is no + // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the + // extra code actually works, then we will have other big problems. So leave it as is. + listeners.add(listener); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index d59c824..1410b53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -19,19 +19,14 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Collections; import java.util.List; -import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; -// imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -62,7 +57,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. */ @InterfaceAudience.Private -public class WALFactory implements WALFileLengthProvider { +public class WALFactory { private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); @@ -135,12 +130,12 @@ public class WALFactory implements WALFileLengthProvider { } } - WALProvider createProvider(Class<? extends WALProvider> clazz, - List<WALActionsListener> listeners, String providerId) throws IOException { + WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) + throws IOException { LOG.info("Instantiating WALProvider of type " + clazz); try { final WALProvider result = clazz.getDeclaredConstructor().newInstance(); - result.init(this, conf, listeners, providerId); + result.init(this, conf, providerId); return result; } catch (Exception e) { LOG.error("couldn't set up WALProvider, the configured class is " + clazz); @@ -150,24 +145,23 @@ public class WALFactory implements WALFileLengthProvider { } /** - * instantiate a provider from a config property. - * requires conf to have already been set (as well as anything the provider might need to read). + * instantiate a provider from a config property. requires conf to have already been set (as well + * as anything the provider might need to read). */ - WALProvider getProvider(final String key, final String defaultValue, - final List<WALActionsListener> listeners, final String providerId) throws IOException { + WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); - return createProvider(clazz, listeners, providerId); + WALProvider provider = createProvider(clazz, providerId); + provider.addWALActionsListener(new MetricsWAL()); + return provider; } /** * @param conf must not be null, will keep a reference to read params in later reader/writer - * instances. - * @param listeners may be null. will be given to all created wals (and not meta-wals) + * instances. * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations - * to make a directory + * to make a directory */ - public WALFactory(final Configuration conf, final List<WALActionsListener> listeners, - final String factoryId) throws IOException { + public WALFactory(Configuration conf, String factoryId) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); @@ -178,12 +172,12 @@ public class WALFactory implements WALFileLengthProvider { this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { - provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null); + provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); - provider.init(this, conf, null, factoryId); + provider.init(this, conf, factoryId); } } @@ -236,7 +230,6 @@ public class WALFactory implements WALFileLengthProvider { return provider; } provider = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER, - Collections.<WALActionsListener> singletonList(new MetricsWAL()), AbstractFSWALProvider.META_WAL_PROVIDER_ID); if (metaProvider.compareAndSet(null, provider)) { return provider; @@ -448,9 +441,4 @@ public class WALFactory implements WALFileLengthProvider { public final WALProvider getMetaWALProvider() { return this.metaProvider.get(); } - - @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { - return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 0586d1d..7ad815e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -21,34 +21,31 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.yetus.audience.InterfaceAudience; /** - * The Write Ahead Log (WAL) stores all durable edits to the HRegion. - * This interface provides the entry point for all WAL implementors. + * The Write Ahead Log (WAL) stores all durable edits to the HRegion. This interface provides the + * entry point for all WAL implementors. * <p> - * See {@link FSHLogProvider} for an example implementation. - * - * A single WALProvider will be used for retrieving multiple WALs in a particular region server - * and must be threadsafe. + * See {@link FSHLogProvider} for an example implementation. A single WALProvider will be used for + * retrieving multiple WALs in a particular region server and must be threadsafe. */ @InterfaceAudience.Private public interface WALProvider { /** - * Set up the provider to create wals. - * will only be called once per instance. + * Set up the provider to create wals. will only be called once per instance. * @param factory factory that made us may not be null * @param conf may not be null - * @param listeners may be null * @param providerId differentiate between providers from one factory. may be null */ - void init(WALFactory factory, Configuration conf, List<WALActionsListener> listeners, - String providerId) throws IOException; + void init(WALFactory factory, Configuration conf, String providerId) throws IOException; /** * @param region the region which we want to get a WAL for it. Could be null. @@ -62,16 +59,16 @@ public interface WALProvider { List<WAL> getWALs(); /** - * persist outstanding WALs to storage and stop accepting new appends. - * This method serves as shorthand for sending a sync to every WAL provided by a given - * implementation. Those WALs will also stop accepting new writes. + * persist outstanding WALs to storage and stop accepting new appends. This method serves as + * shorthand for sending a sync to every WAL provided by a given implementation. Those WALs will + * also stop accepting new writes. */ void shutdown() throws IOException; /** - * shutdown utstanding WALs and clean up any persisted state. - * Call this method only when you will not need to replay any of the edits to the WALs from - * this provider. After this call completes, the underlying resources should have been reclaimed. + * shutdown utstanding WALs and clean up any persisted state. Call this method only when you will + * not need to replay any of the edits to the WALs from this provider. After this call completes, + * the underlying resources should have been reclaimed. */ void close() throws IOException; @@ -83,11 +80,13 @@ public interface WALProvider { // interface provided by WAL. interface Writer extends WriterBase { void sync() throws IOException; + void append(WAL.Entry entry) throws IOException; } interface AsyncWriter extends WriterBase { CompletableFuture<Long> sync(); + void append(WAL.Entry entry); } @@ -101,4 +100,17 @@ public interface WALProvider { */ long getLogFileSize(); + /** + * Add a {@link WALActionsListener}. + * <p> + * Notice that you must call this method before calling {@link #getWAL(RegionInfo)} as this method + * will not effect the {@link WAL} which has already been created. And as long as we can only it + * when initialization, it is not thread safe. + */ + void addWALActionsListener(WALActionsListener listener); + + default WALFileLengthProvider getWALFileLengthProvider() { + return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) + .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 09e6935..6007b07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -112,8 +112,6 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; @@ -2326,9 +2324,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { // unless I pass along via the conf. Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); - return (new WALFactory(confForWAL, - Collections.<WALActionsListener> singletonList(new MetricsWAL()), - "hregion-" + RandomStringUtils.randomNumeric(8))).getWAL(hri); + return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index df80fa0..3ee7020 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -155,7 +155,7 @@ public class TestWALObserver { if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); } - this.wals = new WALFactory(conf, null, serverName); + this.wals = new WALFactory(conf, serverName); } @After @@ -353,8 +353,9 @@ public class TestWALObserver { Path p = runWALSplit(newConf); LOG.info("WALSplit path == " + p); // Make a new wal for new region open. - final WALFactory wals2 = new WALFactory(conf, null, - ServerName.valueOf(currentTest.getMethodName()+"2", 16010, System.currentTimeMillis()).toString()); + final WALFactory wals2 = new WALFactory(conf, + ServerName.valueOf(currentTest.getMethodName() + "2", 16010, System.currentTimeMillis()) + .toString()); WAL wal2 = wals2.getWAL(null); HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d30fe9f..2f518c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -45,10 +45,10 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -114,7 +114,7 @@ public class TestLogsCleaner { conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); - Replication.decorateMasterConfiguration(conf); + HMaster.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueueStorage queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index d3385e7..08dd428 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Pair; @@ -93,7 +93,7 @@ public class TestReplicationHFileCleaner { TEST_UTIL.startMiniZKCluster(); server = new DummyServer(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - Replication.decorateMasterConfiguration(conf); + HMaster.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); rp.init(); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 5792106..f26998b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -178,7 +178,7 @@ public class TestCacheOnWriteInSchema { fs.delete(logdir, true); RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); - walFactory = new WALFactory(conf, null, id); + walFactory = new WALFactory(conf, id); region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info)); store = new HStore(region, hcd, conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java index aa5365c..225c723 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java @@ -175,7 +175,7 @@ public class TestCompactionArchiveConcurrentClose { ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); - final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); + final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName()); HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null); region.initialize(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java index b8780af..4c6cf6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java @@ -196,7 +196,7 @@ public class TestCompactionArchiveIOException { HRegionFileSystem fs = new HRegionFileSystem(conf, errFS, tableDir, info); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); - final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); + final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName()); HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null); region.initialize(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index cd8539a..b7e0164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -946,7 +946,7 @@ public class TestDefaultMemStore { EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); EnvironmentEdgeManager.injectEdge(edge); edge.setCurrentTimeMillis(1234); - WALFactory wFactory = new WALFactory(conf, null, "1234"); + WALFactory wFactory = new WALFactory(conf, "1234"); HRegion meta = HRegion.createHRegion(RegionInfoBuilder.FIRST_META_REGIONINFO, testDir, conf, FSTableDescriptors.createMetaTableDescriptor(conf), wFactory.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO)); http://git-wip-us.apache.org/repos/asf/hbase/blob/fc6d140a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index e8dbdac..9c5a667 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -165,7 +165,7 @@ public class TestHMobStore { ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); - final WALFactory wals = new WALFactory(walConf, null, methodName); + final WALFactory wals = new WALFactory(walConf, methodName); region = new HRegion(tableDir, wals.getWAL(info), fs, conf, info, htd, null); store = new HMobStore(region, hcd, conf); if(testStore) {