Repository: hbase Updated Branches: refs/heads/branch-2.0 8e36aae9d -> 361dea85c
HBASE-21544 Backport HBASE-20734 Colocate recovered edits directory with hbase.wal.dir JE: Fairly direct backport from >=branch-2.1 to solve an issue where an over-aggressive check for hflush() breaks Azure-based FileSystems. Amending-Author: Reid Chan <reidc...@apache.org> Signed-off-by: Reid Chan <reidc...@apache.org> Signed-off-by: Josh Elser <els...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/361dea85 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/361dea85 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/361dea85 Branch: refs/heads/branch-2.0 Commit: 361dea85c90229d3c2752a814bfc38073f14a2c9 Parents: 8e36aae Author: Zach York <zy...@apache.org> Authored: Wed Jun 27 16:18:53 2018 -0700 Committer: Josh Elser <els...@apache.org> Committed: Wed Dec 5 15:06:03 2018 -0500 ---------------------------------------------------------------------- .../apache/hadoop/hbase/util/CommonFSUtils.java | 28 +++ .../assignment/MergeTableRegionsProcedure.java | 8 +- .../assignment/SplitTableRegionProcedure.java | 10 +- .../AbstractStateMachineTableProcedure.java | 6 + .../hadoop/hbase/regionserver/HRegion.java | 159 ++++++++++----- .../apache/hadoop/hbase/wal/WALSplitter.java | 198 +++++++++---------- .../hadoop/hbase/master/AbstractTestDLS.java | 6 +- .../hadoop/hbase/regionserver/TestHRegion.java | 8 +- .../regionserver/wal/AbstractTestWALReplay.java | 8 +- .../hbase/wal/TestReadWriteSeqIdFiles.java | 18 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 58 +++--- 12 files changed, 304 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index a34048a..a08f9f2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -417,6 +417,34 @@ public abstract class CommonFSUtils { } /** + * Returns the WAL region directory based on the given table name and region name + * @param conf configuration to determine WALRootDir + * @param tableName Table that the region is under + * @param encodedRegionName Region name used for creating the final region directory + * @return the region directory used to store WALs under the WALRootDir + * @throws IOException if there is an exception determining the WALRootDir + */ + public static Path getWALRegionDir(final Configuration conf, + final TableName tableName, final String encodedRegionName) + throws IOException { + return new Path(getWALTableDir(conf, tableName), + encodedRegionName); + } + + /** + * Returns the Table directory under the WALRootDir for the specified table name + * @param conf configuration used to get the WALRootDir + * @param tableName Table to get the directory for + * @return a path to the WAL table directory for the specified table + * @throws IOException if there is an exception determining the WALRootDir + */ + public static Path getWALTableDir(final Configuration conf, final TableName tableName) + throws IOException { + return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + + /** * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under * path rootdir * http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index accc051..a3ec9cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -824,14 +824,16 @@ public class MergeTableRegionsProcedure } private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { - FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = -1L; for (RegionInfo region : regionsToMerge) { maxSequenceId = - Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region))); + Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId( + walFS, getWALRegionDir(env, region))); } if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion), + maxSequenceId); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index ef732fd..3b81d7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -884,12 +884,14 @@ public class SplitTableRegionProcedure } private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { - FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion())); + WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion())); if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId); - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI), + maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI), + maxSequenceId); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 8c13ef4..fa77fda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -139,6 +139,12 @@ public abstract class AbstractStateMachineTableProcedure<TState> return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); } + protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region) + throws IOException { + return FSUtils.getWALRegionDir(env.getMasterConfiguration(), + region.getTable(), region.getEncodedName()); + } + /** * Check that cluster is up and master is running. Check table is modifiable. * If <code>enabled</code>, check table is enabled else check it is disabled. http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 298e22a..6d7145a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -326,6 +326,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + private Path regionDir; + private FileSystem walFS; + // The internal wait duration to acquire a lock before read/update // from the region. It is not per row. The purpose of this wait time // is to avoid waiting a long time while the region is busy, so that @@ -927,7 +930,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi stores.forEach(HStore::startReplayingFromWAL); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); } finally { @@ -970,14 +973,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long maxSeqIdFromFile = - WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir()); + WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir()); long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1; // The openSeqNum will always be increase even for read only region, as we rely on it to // determine whether a region has been successfully reopend, so here we always need to update // the max sequence id file. if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName()); - WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId); + WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), getWALRegionDir(), nextSeqId); + //WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1); } LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId); @@ -1123,11 +1127,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, mvcc); - // Store SeqId in HDFS when a region closes + // Store SeqId in WAL FileSystem when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online - if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { - WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), + if (getWalFileSystem().exists(getWALRegionDir())) { + WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), mvcc.getReadPoint()); } } @@ -1846,6 +1850,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.fs; } + /** @return the WAL {@link HRegionFileSystem} used by this region */ + HRegionFileSystem getRegionWALFileSystem() throws IOException { + return new HRegionFileSystem(conf, getWalFileSystem(), + FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo()); + } + + /** @return the WAL {@link FileSystem} being used by this region */ + FileSystem getWalFileSystem() throws IOException { + if (walFS == null) { + walFS = FSUtils.getWALFileSystem(conf); + } + return walFS; + } + + /** + * @return the Region directory under WALRootDirectory + * @throws IOException if there is an error getting WALRootDir + */ + @VisibleForTesting + public Path getWALRegionDir() throws IOException { + if (regionDir == null) { + regionDir = FSUtils.getWALRegionDir(conf, getRegionInfo().getTable(), + getRegionInfo().getEncodedName()); + } + return regionDir; + } + @Override public long getEarliestFlushTimeForAllStores() { return Collections.min(lastStoreFlushTimeMap.values()); @@ -4381,8 +4412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * recovered edits log or <code>minSeqId</code> if nothing added from editlogs. * @throws IOException */ - protected long replayRecoveredEditsIfAny(final Path regiondir, - Map<byte[], Long> maxSeqIdInStores, + protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, final MonitoredTask status) throws IOException { long minSeqIdForTheRegion = -1; @@ -4393,14 +4423,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long seqid = minSeqIdForTheRegion; - FileSystem fs = this.fs.getFileSystem(); - NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + FileSystem walFS = getWalFileSystem(); + FileSystem rootFS = getFilesystem(); + Path regionDir = getWALRegionDir(); + Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo()); + + // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear + // under the root dir even if walDir is set. + NavigableSet<Path> filesUnderRootDir = null; + if (!regionDir.equals(defaultRegionDir)) { + filesUnderRootDir = + WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir); + seqid = Math.max(seqid, + replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter, + defaultRegionDir)); + } + + NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir); + seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, + files, reporter, regionDir)); + + if (seqid > minSeqIdForTheRegion) { + // Then we added some edits to memory. Flush and cleanup split edit files. + internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); + } + // Now delete the content of recovered edits. We're done w/ them. + if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { + // For debugging data loss issues! + // If this flag is set, make use of the hfile archiving by making recovered.edits a fake + // column family. Have to fake out file type too by casting our recovered.edits as storefiles + String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName(); + Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size()); + for (Path file: files) { + fakeStoreFiles.add( + new HStoreFile(walFS, file, this.conf, null, null, true)); + } + getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); + } else { + if (filesUnderRootDir != null) { + for (Path file : filesUnderRootDir) { + if (!rootFS.delete(file, false)) { + LOG.error("Failed delete of {} from under the root directory.", file); + } else { + LOG.debug("Deleted recovered.edits under root directory. file=" + file); + } + } + } + for (Path file: files) { + if (!walFS.delete(file, false)) { + LOG.error("Failed delete of " + file); + } else { + LOG.debug("Deleted recovered.edits file=" + file); + } + } + } + return seqid; + } + + private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs, + final NavigableSet<Path> files, final CancelableProgressable reporter, final Path regionDir) + throws IOException { + long seqid = minSeqIdForTheRegion; if (LOG.isDebugEnabled()) { LOG.debug("Found " + (files == null ? 0 : files.size()) - + " recovered edits file(s) under " + regiondir); + + " recovered edits file(s) under " + regionDir); } - if (files == null || files.isEmpty()) return seqid; + if (files == null || files.isEmpty()) { + return minSeqIdForTheRegion; + } for (Path edits: files) { if (edits == null || !fs.exists(edits)) { @@ -4415,8 +4506,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (maxSeqId <= minSeqIdForTheRegion) { if (LOG.isDebugEnabled()) { String msg = "Maximum sequenceid for this wal is " + maxSeqId - + " and minimum sequenceid for the region is " + minSeqIdForTheRegion - + ", skipped the whole file, path=" + edits; + + " and minimum sequenceid for the region is " + minSeqIdForTheRegion + + ", skipped the whole file, path=" + edits; LOG.debug(msg); } continue; @@ -4425,7 +4516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { // replay the edits. Replay can return -1 if everything is skipped, only update // if seqId is greater - seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); + seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs)); } catch (IOException e) { boolean skipErrors = conf.getBoolean( HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, @@ -4435,10 +4526,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (conf.get("hbase.skip.errors") != null) { LOG.warn( "The property 'hbase.skip.errors' has been deprecated. Please use " + - HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); + HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); } if (skipErrors) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing. Renamed " + edits + " as " + p, e); @@ -4447,31 +4538,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - if (seqid > minSeqIdForTheRegion) { - // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); - } - // Now delete the content of recovered edits. We're done w/ them. - if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { - // For debugging data loss issues! - // If this flag is set, make use of the hfile archiving by making recovered.edits a fake - // column family. Have to fake out file type too by casting our recovered.edits as storefiles - String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); - Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size()); - for (Path file: files) { - fakeStoreFiles.add( - new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true)); - } - getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); - } else { - for (Path file: files) { - if (!fs.delete(file, false)) { - LOG.error("Failed delete of " + file); - } else { - LOG.debug("Deleted recovered.edits file=" + file); - } - } - } return seqid; } @@ -4485,12 +4551,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private long replayRecoveredEdits(final Path edits, - Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter) + Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs) throws IOException { String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); - FileSystem fs = this.fs.getFileSystem(); status.setStatus("Opening recovered edits"); WAL.Reader reader = null; @@ -4644,7 +4709,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postReplayWALs(this.getRegionInfo(), edits); } } catch (EOFException eof) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); msg = "EnLongAddered EOF. Most likely due to Master failure during " + "wal splitting, so we have this data in another edit. " + "Continuing, but renaming " + edits + " as " + p; @@ -4654,7 +4719,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (ioe.getCause() instanceof ParseException) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); msg = "File corruption enLongAddered! " + "Continuing, but renaming " + edits + " as " + p; LOG.warn(msg, ioe); @@ -7938,7 +8003,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 53 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 3 * Bytes.SIZEOF_BOOLEAN); http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 5689a35..9ed7f6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -115,8 +115,8 @@ public class WALSplitter { public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; // Parameters for split process - protected final Path rootDir; - protected final FileSystem fs; + protected final Path walDir; + protected final FileSystem walFS; protected final Configuration conf; // Major subcomponents of the split process. @@ -148,15 +148,15 @@ public class WALSplitter { @VisibleForTesting - WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, - FileSystem fs, LastSequenceId idChecker, + WALSplitter(final WALFactory factory, Configuration conf, Path walDir, + FileSystem walFS, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); - this.rootDir = rootDir; - this.fs = fs; + this.walDir = walDir; + this.walFS = walFS; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; @@ -186,11 +186,11 @@ public class WALSplitter { * <p> * @return false if it is interrupted by the progress-able. */ - public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, + public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, + WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); } @@ -201,13 +201,13 @@ public class WALSplitter { // which uses this method to do log splitting. @VisibleForTesting public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, - FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { + FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List<Path> splits = new ArrayList<>(); if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); + WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -216,7 +216,7 @@ public class WALSplitter { } } } - if (!fs.delete(logDir, true)) { + if (!walFS.delete(logDir, true)) { throw new IOException("Unable to delete src dir: " + logDir); } return splits; @@ -322,10 +322,10 @@ public class WALSplitter { LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs); + splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); } else { // for tests only - ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); } isCorrupted = true; } catch (IOException e) { @@ -373,31 +373,30 @@ public class WALSplitter { */ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { - Path rootdir = FSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walDir = FSUtils.getWALRootDir(conf); + Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); Path logPath; - if (FSUtils.isStartingWithPath(rootdir, logfile)) { + if (FSUtils.isStartingWithPath(walDir, logfile)) { logPath = new Path(logfile); } else { - logPath = new Path(rootdir, logfile); + logPath = new Path(walDir, logfile); } - finishSplitLogFile(rootdir, oldLogDir, logPath, conf); + finishSplitLogFile(walDir, oldLogDir, logPath, conf); } - private static void finishSplitLogFile(Path rootdir, Path oldLogDir, + private static void finishSplitLogFile(Path walDir, Path oldLogDir, Path logPath, Configuration conf) throws IOException { List<Path> processedLogs = new ArrayList<>(); List<Path> corruptedLogs = new ArrayList<>(); - FileSystem fs; - fs = rootdir.getFileSystem(conf); - if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { + FileSystem walFS = walDir.getFileSystem(conf); + if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { corruptedLogs.add(logPath); } else { processedLogs.add(logPath); } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); - Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); - fs.delete(stagingDir, true); + archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); + walFS.delete(stagingDir, true); } /** @@ -408,30 +407,30 @@ public class WALSplitter { * @param corruptedLogs * @param processedLogs * @param oldLogDir - * @param fs + * @param walFS WAL FileSystem to archive files on. * @param conf * @throws IOException */ private static void archiveLogs( final List<Path> corruptedLogs, final List<Path> processedLogs, final Path oldLogDir, - final FileSystem fs, final Configuration conf) throws IOException { + final FileSystem walFS, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", corruptDir); } - if (!fs.mkdirs(corruptDir)) { + if (!walFS.mkdirs(corruptDir)) { LOG.info("Unable to mkdir {}", corruptDir); } - fs.mkdirs(oldLogDir); + walFS.mkdirs(oldLogDir); // this method can get restarted or called multiple times for archiving // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (fs.exists(corrupted)) { - if (!fs.rename(corrupted, p)) { + if (walFS.exists(corrupted)) { + if (!walFS.rename(corrupted, p)) { LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); } else { LOG.warn("Moved corrupted log {} to {}", corrupted, p); @@ -441,8 +440,8 @@ public class WALSplitter { for (Path p : processedLogs) { Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); - if (fs.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + if (walFS.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { LOG.warn("Unable to move {} to {}", p, newPath); } else { LOG.info("Archived processed log {} to {}", p, newPath); @@ -466,36 +465,31 @@ public class WALSplitter { */ @SuppressWarnings("deprecation") @VisibleForTesting - static Path getRegionSplitEditsPath(final FileSystem fs, - final Entry logEntry, final Path rootDir, String fileNameBeingSplit) - throws IOException { - Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName()); + static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, + Configuration conf) throws IOException { + FileSystem walFS = FSUtils.getWALFileSystem(conf); + Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); - Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); - Path dir = getRegionDirRecoveredEditsDir(regiondir); + Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path dir = getRegionDirRecoveredEditsDir(regionDir); - if (!fs.exists(regiondir)) { - LOG.info("This region's directory does not exist: {}." - + "It is very likely that it was already split so it is " - + "safe to discard those edits.", regiondir); - return null; - } - if (fs.exists(dir) && fs.isFile(dir)) { + + if (walFS.exists(dir) && walFS.isFile(dir)) { Path tmp = new Path("/tmp"); - if (!fs.exists(tmp)) { - fs.mkdirs(tmp); + if (!walFS.exists(tmp)) { + walFS.mkdirs(tmp); } tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); LOG.warn("Found existing old file: {}. It could be some " + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", dir, tmp); - if (!fs.rename(dir, tmp)) { + if (!walFS.rename(dir, tmp)) { LOG.warn("Failed to sideline old file {}", dir); } } - if (!fs.exists(dir) && !fs.mkdirs(dir)) { + if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { LOG.warn("mkdir failed on {}", dir); } // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. @@ -533,34 +527,34 @@ public class WALSplitter { private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; /** - * @param regiondir + * @param regionDir * This regions directory in the filesystem. * @return The directory that holds recovered edits files for the region - * <code>regiondir</code> + * <code>regionDir</code> */ - public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { - return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); + public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { + return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); } /** * Check whether there is recovered.edits in the region dir - * @param fs FileSystem + * @param walFS FileSystem * @param conf conf * @param regionInfo the region to check * @throws IOException IOException * @return true if recovered.edits exist in the region dir */ - public static boolean hasRecoveredEdits(final FileSystem fs, + public static boolean hasRecoveredEdits(final FileSystem walFS, final Configuration conf, final RegionInfo regionInfo) throws IOException { // No recovered.edits for non default replica regions if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { return false; } - Path rootDir = FSUtils.getRootDir(conf); //Only default replica region can reach here, so we can use regioninfo //directly without converting it to default replica's regioninfo. - Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); - NavigableSet<Path> files = getSplitEditFilesSorted(fs, regionDir); + Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(), + regionInfo.getEncodedName()); + NavigableSet<Path> files = getSplitEditFilesSorted(walFS, regionDir); return files != null && !files.isEmpty(); } @@ -569,19 +563,19 @@ public class WALSplitter { * Returns sorted set of edit files made by splitter, excluding files * with '.temp' suffix. * - * @param fs - * @param regiondir - * @return Files in passed <code>regiondir</code> as a sorted set. + * @param walFS WAL FileSystem used to retrieving split edits files. + * @param regionDir WAL region dir to look for recovered edits files under. + * @return Files in passed <code>regionDir</code> as a sorted set. * @throws IOException */ - public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs, - final Path regiondir) throws IOException { + public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, + final Path regionDir) throws IOException { NavigableSet<Path> filesSorted = new TreeSet<>(); - Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) { + Path editsdir = getRegionDirRecoveredEditsDir(regionDir); + if (!walFS.exists(editsdir)) { return filesSorted; } - FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { boolean result = false; @@ -591,7 +585,7 @@ public class WALSplitter { // In particular, on error, we'll move aside the bad edit file giving // it a timestamp suffix. See moveAsideBadEditsFile. Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); - result = fs.isFile(p) && m.matches(); + result = walFS.isFile(p) && m.matches(); // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, // because it means splitwal thread is writting this file. if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { @@ -616,17 +610,17 @@ public class WALSplitter { /** * Move aside a bad edits file. * - * @param fs + * @param walFS WAL FileSystem used to rename bad edits file. * @param edits * Edits file to move aside. * @return The name of the moved aside file. * @throws IOException */ - public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) + public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) throws IOException { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); - if (!fs.rename(edits, moveAsideName)) { + if (!walFS.rename(edits, moveAsideName)) { LOG.warn("Rename failed from {} to {}", edits, moveAsideName); } return moveAsideName; @@ -645,12 +639,13 @@ public class WALSplitter { || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); } - private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException { + private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) + throws IOException { // TODO: Why are we using a method in here as part of our normal region open where // there is no splitting involved? Fix. St.Ack 01/20/2017. Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); try { - FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile); + FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile); return files != null ? files : new FileStatus[0]; } catch (FileNotFoundException e) { return new FileStatus[0]; @@ -674,16 +669,16 @@ public class WALSplitter { /** * Get the max sequence id which is stored in the region directory. -1 if none. */ - public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException { - return getMaxSequenceId(getSequenceIdFiles(fs, regionDir)); + public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { + return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); } /** * Create a file with name as region's max sequence id */ - public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId) + public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) throws IOException { - FileStatus[] files = getSequenceIdFiles(fs, regionDir); + FileStatus[] files = getSequenceIdFiles(walFS, regionDir); long maxSeqId = getMaxSequenceId(files); if (maxSeqId > newMaxSeqId) { throw new IOException("The new max sequence id " + newMaxSeqId + @@ -694,7 +689,7 @@ public class WALSplitter { newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); if (newMaxSeqId != maxSeqId) { try { - if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { + if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, @@ -706,7 +701,7 @@ public class WALSplitter { // remove old ones for (FileStatus status : files) { if (!newSeqIdFile.equals(status.getPath())) { - fs.delete(status.getPath(), false); + walFS.delete(status.getPath(), false); } } } @@ -733,7 +728,7 @@ public class WALSplitter { } try { - FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); + FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); try { in = getReader(path, reporter); } catch (EOFException e) { @@ -800,7 +795,7 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(fs, logfile); + return walFactory.createRecoveredEditsWriter(walFS, logfile); } /** @@ -808,7 +803,7 @@ public class WALSplitter { * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(fs, curLogFile, reporter); + return walFactory.createReader(walFS, curLogFile, reporter); } /** @@ -1283,9 +1278,10 @@ public class WALSplitter { } // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { + private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) + throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(fs, dst)) { + try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); @@ -1297,15 +1293,15 @@ public class WALSplitter { if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { + + walFS.getFileStatus(dst).getLen()); + if (!walFS.delete(dst, false)) { LOG.warn("Failed deleting of old {}", dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + fs.getFileStatus(wap.p).getLen()); - if (!fs.delete(wap.p, false)) { + + ", length=" + walFS.getFileStatus(wap.p).getLen()); + if (!walFS.delete(wap.p, false)) { LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } @@ -1389,9 +1385,7 @@ public class WALSplitter { Path closeWriter(String encodedRegionName, WriterAndPath wap, List<IOException> thrown) throws IOException{ - if (LOG.isTraceEnabled()) { - LOG.trace("Closing " + wap.p); - } + LOG.trace("Closing " + wap.p); try { wap.w.close(); } catch (IOException ioe) { @@ -1406,7 +1400,7 @@ public class WALSplitter { } if (wap.editsWritten == 0) { // just remove the empty recovered.edits file - if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { + if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { LOG.warn("Failed deleting empty " + wap.p); throw new IOException("Failed deleting empty " + wap.p); } @@ -1416,14 +1410,14 @@ public class WALSplitter { Path dst = getCompletedRecoveredEditsFilePath(wap.p, regionMaximumEditLogSeqNum.get(encodedRegionName)); try { - if (!dst.equals(wap.p) && fs.exists(dst)) { + if (!dst.equals(wap.p) && walFS.exists(dst)) { deleteOneWithFewerEntries(wap, dst); } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. // TestHLogSplit#testThreading is an example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { + if (walFS.exists(wap.p)) { + if (!walFS.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.info("Rename " + wap.p + " to " + dst); @@ -1495,7 +1489,7 @@ public class WALSplitter { if (blacklistedRegions.contains(region)) { return null; } - ret = createWAP(region, entry, rootDir); + ret = createWAP(region, entry); if (ret == null) { blacklistedRegions.add(region); return null; @@ -1509,16 +1503,18 @@ public class WALSplitter { /** * @return a path with a write for that path. caller should close. */ - WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { - Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName()); + WriterAndPath createWAP(byte[] region, Entry entry) throws IOException { + Path regionedits = getRegionSplitEditsPath(entry, + fileBeingSplit.getPath().getName(), conf); if (regionedits == null) { return null; } - if (fs.exists(regionedits)) { + FileSystem walFs = FSUtils.getWALFileSystem(conf); + if (walFs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { + + walFs.getFileStatus(regionedits).getLen()); + if (!walFs.delete(regionedits, false)) { LOG.warn("Failed delete of old {}", regionedits); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index bc4d32c..f36b38c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.Region; @@ -221,10 +220,11 @@ public abstract class AbstractTestDLS { int count = 0; for (RegionInfo hri : regions) { - Path tdir = FSUtils.getTableDir(rootdir, table); + Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter - .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); + .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf, + tableName, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 34b7760..d127c7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -704,7 +704,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -756,7 +756,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -801,7 +801,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); assertEquals(minSeqId, seqId); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -859,7 +859,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index d20188a..aec001e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -869,7 +869,7 @@ public abstract class AbstractTestWALReplay { final TableName tableName = TableName.valueOf(currentTest.getMethodName()); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final Path basedir = - FSUtils.getTableDir(this.hbaseRootDir, tableName); + FSUtils.getWALTableDir(conf, tableName); deleteDir(basedir); final byte[] rowName = tableName.getName(); final int countPerFamily = 10; @@ -902,7 +902,7 @@ public abstract class AbstractTestWALReplay { WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, wals); FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), + new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { @Override public boolean accept(Path p) { @@ -929,13 +929,13 @@ public abstract class AbstractTestWALReplay { public void testDatalossWhenInputError() throws Exception { final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + final Path basedir = FSUtils.getWALTableDir(conf, tableName); deleteDir(basedir); final byte[] rowName = tableName.getName(); final int countPerFamily = 10; final HTableDescriptor htd = createBasic1FamilyHTD(tableName); HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - Path regionDir = region1.getRegionFileSystem().getRegionDir(); + Path regionDir = region1.getWALRegionDir(); HBaseTestingUtility.closeRegionAndWAL(region1); WAL wal = createWAL(this.conf, hbaseRootDir, logName); http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java index 6e3aa10..8ae638c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java @@ -49,13 +49,13 @@ public class TestReadWriteSeqIdFiles { private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); - private static FileSystem FS; + private static FileSystem walFS; private static Path REGION_DIR; @BeforeClass public static void setUp() throws IOException { - FS = FileSystem.getLocal(UTIL.getConfiguration()); + walFS = FileSystem.getLocal(UTIL.getConfiguration()); REGION_DIR = UTIL.getDataTestDir(); } @@ -66,20 +66,20 @@ public class TestReadWriteSeqIdFiles { @Test public void test() throws IOException { - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L); - assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L); - assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L); + assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L); + assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); // can not write a sequence id which is smaller try { - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L); } catch (IOException e) { // expected LOG.info("Expected error", e); } Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR); - FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() { + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { return WALSplitter.isSequenceIdFile(p); @@ -89,7 +89,7 @@ public class TestReadWriteSeqIdFiles { assertEquals(1, files.length); // verify all seqId files aren't treated as recovered.edits files - NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR); + NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR); assertEquals(0, recoveredEdits.size()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index e8161f4..f96a1d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -179,7 +179,7 @@ public class TestWALFactory { final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final int howmany = 3; RegionInfo[] infos = new RegionInfo[3]; - Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName); + Path tabledir = FSUtils.getWALTableDir(conf, tableName); fs.mkdirs(tabledir); for (int i = 0; i < howmany; i++) { infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index f5800df..2036027 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -250,9 +250,9 @@ public class TestWALSplit { } LOG.debug(Objects.toString(ls)); LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); LOG.info("Finished splitting out from under zombie."); - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals("wrong number of split files for region", numWriters, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -390,8 +390,8 @@ public class TestWALSplit { new Entry(new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); - Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, - FILENAME_BEING_SPLIT); + Path p = WALSplitter.getRegionSplitEditsPath(entry, + FILENAME_BEING_SPLIT, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); } @@ -416,8 +416,8 @@ public class TestWALSplit { assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); fs.createNewFile(parent); // create a recovered.edits file - Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, - FILENAME_BEING_SPLIT); + Path p = WALSplitter.getRegionSplitEditsPath(entry, + FILENAME_BEING_SPLIT, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); @@ -437,9 +437,9 @@ public class TestWALSplit { generateWALs(1, 10, -1, 0); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -453,9 +453,9 @@ public class TestWALSplit { generateWALs(1, 10, -1, 100); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -480,13 +480,13 @@ public class TestWALSplit { writer.close(); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); // original log should have 10 test edits, 10 region markers, 1 compaction marker assertEquals(21, countWAL(originalLog)); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName()); + Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -501,10 +501,10 @@ public class TestWALSplit { private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); int result = 0; for (String region : REGIONS) { - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals(expectedFiles, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -637,7 +637,7 @@ public class TestWALSplit { walDirContents.add(status.getPath().getName()); } useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); return walDirContents; } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, @@ -678,9 +678,9 @@ public class TestWALSplit { corruptWAL(c1, corruption, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; @@ -714,7 +714,7 @@ public class TestWALSplit { conf.setBoolean(HBASE_SKIP_ERRORS, false); generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); } @@ -730,7 +730,7 @@ public class TestWALSplit { throws IOException { generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus [] statuses = null; try { statuses = fs.listStatus(WALDIR); @@ -760,7 +760,7 @@ public class TestWALSplit { try { InstrumentedLogWriter.activateFailure = true; - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); } catch (IOException e) { assertTrue(e.getMessage(). contains("This exception is instrumented and should only be thrown for testing")); @@ -781,7 +781,7 @@ public class TestWALSplit { Path regiondir = new Path(TABLEDIR, region); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -858,7 +858,7 @@ public class TestWALSplit { useDifferentDFSClient(); try { - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(WALDIR)); } catch (IOException e) { @@ -1082,7 +1082,7 @@ public class TestWALSplit { Path regiondir = new Path(TABLEDIR, REGION); LOG.info("Region directory is" + regiondir); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -1095,7 +1095,7 @@ public class TestWALSplit { injectEmptyFile(".empty", true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); assertFalse(fs.exists(tdir)); @@ -1120,7 +1120,7 @@ public class TestWALSplit { Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); assertEquals(1, fs.listStatus(corruptDir).length); @@ -1148,14 +1148,14 @@ public class TestWALSplit { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); + Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); if (files != null && !files.isEmpty()) { for (Path file : files) { - if (!this.fs.delete(file, false)) { + if (!this.walFS.delete(file, false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -1234,9 +1234,9 @@ public class TestWALSplit { - private Path[] getLogForRegion(Path rootdir, TableName table, String region) + private Path[] getLogForRegion(TableName table, String region) throws IOException { - Path tdir = FSUtils.getTableDir(rootdir, table); + Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));