http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java deleted file mode 100644 index d2bbd27..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ /dev/null @@ -1,365 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.IOException; -import java.util.NavigableSet; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.util.FSUtils; - -import com.google.protobuf.TextFormat; - -@InterfaceAudience.Private -public class HLogUtil { - static final Log LOG = LogFactory.getLog(HLogUtil.class); - - /** - * Pattern used to validate a HLog file name - */ - private static final Pattern pattern = - Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*"); - - /** - * @param filename - * name of the file to validate - * @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt> - * otherwise - */ - public static boolean validateHLogFilename(String filename) { - return pattern.matcher(filename).matches(); - } - - /** - * Construct the HLog directory name - * - * @param serverName - * Server name formatted as described in {@link ServerName} - * @return the relative HLog directory name, e.g. - * <code>.logs/1.example.org,60030,12345</code> if - * <code>serverName</code> passed is - * <code>1.example.org,60030,12345</code> - */ - public static String getHLogDirectoryName(final String serverName) { - StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME); - dirName.append("/"); - dirName.append(serverName); - return dirName.toString(); - } - - /** - * @param regiondir - * This regions directory in the filesystem. - * @return The directory that holds recovered edits files for the region - * <code>regiondir</code> - */ - public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { - return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); - } - - /** - * Move aside a bad edits file. - * - * @param fs - * @param edits - * Edits file to move aside. - * @return The name of the moved aside file. - * @throws IOException - */ - public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) - throws IOException { - Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." - + System.currentTimeMillis()); - if (!fs.rename(edits, moveAsideName)) { - LOG.warn("Rename failed from " + edits + " to " + moveAsideName); - } - return moveAsideName; - } - - /** - * @param path - * - the path to analyze. Expected format, if it's in hlog directory: - * / [base directory for hbase] / hbase / .logs / ServerName / - * logfile - * @return null if it's not a log file. Returns the ServerName of the region - * server that created this log file otherwise. - */ - public static ServerName getServerNameFromHLogDirectoryName( - Configuration conf, String path) throws IOException { - if (path == null - || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) { - return null; - } - - if (conf == null) { - throw new IllegalArgumentException("parameter conf must be set"); - } - - final String rootDir = conf.get(HConstants.HBASE_DIR); - if (rootDir == null || rootDir.isEmpty()) { - throw new IllegalArgumentException(HConstants.HBASE_DIR - + " key not found in conf."); - } - - final StringBuilder startPathSB = new StringBuilder(rootDir); - if (!rootDir.endsWith("/")) - startPathSB.append('/'); - startPathSB.append(HConstants.HREGION_LOGDIR_NAME); - if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) - startPathSB.append('/'); - final String startPath = startPathSB.toString(); - - String fullPath; - try { - fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString(); - } catch (IllegalArgumentException e) { - LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage()); - return null; - } - - if (!fullPath.startsWith(startPath)) { - return null; - } - - final String serverNameAndFile = fullPath.substring(startPath.length()); - - if (serverNameAndFile.indexOf('/') < "a,0,0".length()) { - // Either it's a file (not a directory) or it's not a ServerName format - return null; - } - - Path p = new Path(path); - return getServerNameFromHLogDirectoryName(p); - } - - /** - * This function returns region server name from a log file name which is in either format: - * hdfs://<name node>/hbase/.logs/<server name>-splitting/... or hdfs://<name - * node>/hbase/.logs/<server name>/... - * @param logFile - * @return null if the passed in logFile isn't a valid HLog file path - */ - public static ServerName getServerNameFromHLogDirectoryName(Path logFile) { - Path logDir = logFile.getParent(); - String logDirName = logDir.getName(); - if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) { - logDir = logFile; - logDirName = logDir.getName(); - } - ServerName serverName = null; - if (logDirName.endsWith(HLog.SPLITTING_EXT)) { - logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length()); - } - try { - serverName = ServerName.parseServerName(logDirName); - } catch (IllegalArgumentException ex) { - serverName = null; - LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); - } - if (serverName != null && serverName.getStartcode() < 0) { - LOG.warn("Invalid log file path=" + logFile); - return null; - } - return serverName; - } - - /** - * Returns sorted set of edit files made by wal-log splitter, excluding files - * with '.temp' suffix. - * - * @param fs - * @param regiondir - * @return Files in passed <code>regiondir</code> as a sorted set. - * @throws IOException - */ - public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs, - final Path regiondir) throws IOException { - NavigableSet<Path> filesSorted = new TreeSet<Path>(); - Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) - return filesSorted; - FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - boolean result = false; - try { - // Return files and only files that match the editfile names pattern. - // There can be other files in this directory other than edit files. - // In particular, on error, we'll move aside the bad edit file giving - // it a timestamp suffix. See moveAsideBadEditsFile. - Matcher m = HLog.EDITFILES_NAME_PATTERN.matcher(p.getName()); - result = fs.isFile(p) && m.matches(); - // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, - // because it means splithlog thread is writting this file. - if (p.getName().endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) { - result = false; - } - } catch (IOException e) { - LOG.warn("Failed isFile check on " + p); - } - return result; - } - }); - if (files == null) - return filesSorted; - for (FileStatus status : files) { - filesSorted.add(status.getPath()); - } - return filesSorted; - } - - public static boolean isMetaFile(Path p) { - return isMetaFile(p.getName()); - } - - public static boolean isMetaFile(String p) { - if (p != null && p.endsWith(HLog.META_HLOG_FILE_EXTN)) { - return true; - } - return false; - } - - /** - * Write the marker that a compaction has succeeded and is about to be committed. - * This provides info to the HMaster to allow it to recover the compaction if - * this regionserver dies in the middle (This part is not yet implemented). It also prevents - * the compaction from finishing if this regionserver has already lost its lease on the log. - * @param sequenceId Used by HLog to get sequence Id for the waledit. - */ - public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, - final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { - TableName tn = TableName.valueOf(c.getTableName().toByteArray()); - HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null); - log.sync(); - if (LOG.isTraceEnabled()) { - LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); - } - } - - /** - * Write a flush marker indicating a start / abort or a complete of a region flush - */ - public static long writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info, - final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException { - TableName tn = TableName.valueOf(f.getTableName().toByteArray()); - HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - long trx = log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null); - if (sync) log.sync(trx); - if (LOG.isTraceEnabled()) { - LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); - } - return trx; - } - - /** - * Write a region open marker indicating that the region is opened - */ - public static long writeRegionEventMarker(HLog log, HTableDescriptor htd, HRegionInfo info, - final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException { - TableName tn = TableName.valueOf(r.getTableName().toByteArray()); - HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - long trx = log.appendNoSync(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), - sequenceId, false, null); - log.sync(trx); - if (LOG.isTraceEnabled()) { - LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); - } - return trx; - } - - /** - * Create a file with name as region open sequence id - * - * @param fs - * @param regiondir - * @param newSeqId - * @param saftyBumper - * @return long new sequence Id value - * @throws IOException - */ - public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir, - long newSeqId, long saftyBumper) throws IOException { - - Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); - long maxSeqId = 0; - FileStatus[] files = null; - if (fs.exists(editsdir)) { - files = FSUtils.listStatus(fs, editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - if (p.getName().endsWith(HLog.SEQUENCE_ID_FILE_SUFFIX)) { - return true; - } - return false; - } - }); - if (files != null) { - for (FileStatus status : files) { - String fileName = status.getPath().getName(); - try { - Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length() - - HLog.SEQUENCE_ID_FILE_SUFFIX.length())); - maxSeqId = Math.max(tmpSeqId, maxSeqId); - } catch (NumberFormatException ex) { - LOG.warn("Invalid SeqId File Name=" + fileName); - } - } - } - } - if (maxSeqId > newSeqId) { - newSeqId = maxSeqId; - } - newSeqId += saftyBumper; // bump up SeqId - - // write a new seqId file - Path newSeqIdFile = new Path(editsdir, newSeqId + HLog.SEQUENCE_ID_FILE_SUFFIX); - if (!fs.createNewFile(newSeqIdFile)) { - throw new IOException("Failed to create SeqId file:" + newSeqIdFile); - } - // remove old ones - if(files != null) { - for (FileStatus status : files) { - fs.delete(status.getPath(), false); - } - } - return newSeqId; - } - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index cf4b7a6..ad549f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -31,7 +31,7 @@ import org.apache.hadoop.util.StringUtils; * single function call and turn it into multiple manipulations of the hadoop metrics system. */ @InterfaceAudience.Private -public class MetricsWAL { +public class MetricsWAL extends WALActionsListener.Base { static final Log LOG = LogFactory.getLog(MetricsWAL.class); private final MetricsWALSource source; @@ -40,19 +40,20 @@ public class MetricsWAL { source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); } - public void finishSync(long time) { - source.incrementSyncTime(time); + @Override + public void postSync(final long timeInNanos, final int handlerSyncs) { + source.incrementSyncTime(timeInNanos/1000000l); } - public void finishAppend(long time, long size) { - + @Override + public void postAppend(final long size, final long time) { source.incrementAppendCount(); source.incrementAppendTime(time); source.incrementAppendSize(size); if (time > 1000) { source.incrementSlowAppendCount(); - LOG.warn(String.format("%s took %d ms appending an edit to hlog; len~=%s", + LOG.warn(String.format("%s took %d ms appending an edit to wal; len~=%s", Thread.currentThread().getName(), time, StringUtils.humanReadableInt(size))); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 39f1d9f..285f69b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; @@ -58,17 +61,32 @@ import com.google.protobuf.InvalidProtocolBufferException; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) public class ProtobufLogReader extends ReaderBase { private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class); - static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); - static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP"); + // public for WALFactory until we move everything to o.a.h.h.wal + @InterfaceAudience.Private + public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); + // public for TestWALSplit + @InterfaceAudience.Private + public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP"); + /** + * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the + * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer. + */ + static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size"; + static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB + protected FSDataInputStream inputStream; protected Codec.Decoder cellDecoder; protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; protected boolean hasCompression = false; protected boolean hasTagCompression = false; // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry - // in the hlog, the inputstream's position is equal to walEditsStopOffset. + // in the wal, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; private boolean trailerPresent; + protected WALTrailer trailer; + // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger + // than this size, it is written/read respectively, with a WARN message in the log. + protected int trailerWarnSize; private static List<String> writerClsNames = new ArrayList<String>(); static { writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); @@ -121,6 +139,13 @@ public class ProtobufLogReader extends ReaderBase { } @Override + public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) + throws IOException { + this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); + super.init(fs, path, conf, stream); + } + + @Override protected String initReader(FSDataInputStream stream) throws IOException { return initInternal(stream, true); } @@ -268,7 +293,7 @@ public class ProtobufLogReader extends ReaderBase { } @Override - protected boolean readNext(HLog.Entry entry) throws IOException { + protected boolean readNext(Entry entry) throws IOException { while (true) { // OriginalPosition might be < 0 on local fs; if so, it is useless to us. long originalPosition = this.inputStream.getPos(); @@ -332,7 +357,7 @@ public class ProtobufLogReader extends ReaderBase { initCause(realEofEx != null ? realEofEx : ex); } if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { - LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path + LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset); throw new EOFException("Read WALTrailer while reading WALEdits"); @@ -370,11 +395,6 @@ public class ProtobufLogReader extends ReaderBase { } @Override - public WALTrailer getWALTrailer() { - return trailer; - } - - @Override protected void seekOnFs(long pos) throws IOException { this.inputStream.seek(pos); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index fe2eac9..ca80e4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -34,6 +34,10 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; +import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; /** * Writer for protobuf-based WAL. @@ -77,8 +81,7 @@ public class ProtobufLogWriter extends WriterBase { super.init(fs, path, conf, overwritable); assert this.output == null; boolean doCompress = initializeCompressionContext(conf, path); - this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, - HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); + this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); int bufferSize = FSUtils.getDefaultBufferSize(fs); short replication = (short)conf.getInt( "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); @@ -110,7 +113,7 @@ public class ProtobufLogWriter extends WriterBase { } @Override - public void append(HLog.Entry entry) throws IOException { + public void append(Entry entry) throws IOException { entry.setCompressionContext(compressionContext); entry.getKey().getBuilder(compressor). setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output); @@ -134,7 +137,7 @@ public class ProtobufLogWriter extends WriterBase { } } - protected WALTrailer buildWALTrailer(WALTrailer.Builder builder) { + WALTrailer buildWALTrailer(WALTrailer.Builder builder) { return builder.build(); } @@ -188,8 +191,7 @@ public class ProtobufLogWriter extends WriterBase { return this.output; } - @Override - public void setWALTrailer(WALTrailer walTrailer) { + void setWALTrailer(WALTrailer walTrailer) { this.trailer = walTrailer; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 7fe5a81..5f1e904 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -31,21 +31,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.util.LRUDictionary; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; + @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -public abstract class ReaderBase implements HLog.Reader { +public abstract class ReaderBase implements DefaultWALProvider.Reader { private static final Log LOG = LogFactory.getLog(ReaderBase.class); protected Configuration conf; protected FileSystem fs; protected Path path; protected long edit = 0; protected long fileLength; - protected WALTrailer trailer; - // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger - // than this size, it is written/read respectively, with a WARN message in the log. - protected int trailerWarnSize; /** * Compression context to use reading. Can be null if no compression. */ @@ -65,8 +63,6 @@ public abstract class ReaderBase implements HLog.Reader { this.path = path; this.fs = fs; this.fileLength = this.fs.getFileStatus(path).getLen(); - this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, - HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); String cellCodecClsName = initReader(stream); boolean compression = hasCompression(); @@ -87,15 +83,17 @@ public abstract class ReaderBase implements HLog.Reader { } @Override - public HLog.Entry next() throws IOException { + public Entry next() throws IOException { return next(null); } @Override - public HLog.Entry next(HLog.Entry reuse) throws IOException { - HLog.Entry e = reuse; + public Entry next(Entry reuse) throws IOException { + Entry e = reuse; if (e == null) { - e = new HLog.Entry(new HLogKey(), new WALEdit()); + // we use HLogKey here instead of WALKey directly to support legacy coprocessors, + // seqencefile based readers, and HLogInputFormat. + e = new Entry(new HLogKey(), new WALEdit()); } if (compressionContext != null) { e.setCompressionContext(compressionContext); @@ -165,15 +163,10 @@ public abstract class ReaderBase implements HLog.Reader { * @param e The entry to read into. * @return Whether there was anything to read. */ - protected abstract boolean readNext(HLog.Entry e) throws IOException; + protected abstract boolean readNext(Entry e) throws IOException; /** * Performs a filesystem-level seek to a certain position in an underlying file. */ protected abstract void seekOnFs(long pos) throws IOException; - - @Override - public WALTrailer getWALTrailer() { - return null; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java index 985c0bb..03d1608 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java @@ -52,10 +52,10 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter { builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName()); if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { // Get an instance of our cipher - Cipher cipher = Encryption.getCipher(conf, - conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, DEFAULT_CIPHER)); + final String cipherName = conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, DEFAULT_CIPHER); + Cipher cipher = Encryption.getCipher(conf, cipherName); if (cipher == null) { - throw new RuntimeException("Cipher '" + cipher + "' is not available"); + throw new RuntimeException("Cipher '" + cipherName + "' is not available"); } // Generate an encryption key for this WAL http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index 128274a..11312b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.Text; @@ -222,10 +222,27 @@ public class SequenceFileLogReader extends ReaderBase { } + /** + * fill in the passed entry with teh next key/value. + * Note that because this format deals with our legacy storage, the provided + * Entery MUST use an {@link HLogKey} for the key. + * @return boolean indicating if the contents of Entry have been filled in. + */ @Override protected boolean readNext(Entry e) throws IOException { try { - boolean hasNext = this.reader.next(e.getKey(), e.getEdit()); + if (!(e.getKey() instanceof HLogKey)) { + final IllegalArgumentException exception = new IllegalArgumentException( + "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" + + " one had '" + e.getKey().getClass() + "'"); + LOG.error("We need to use the legacy SequenceFileLogReader to handle a " + + " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." + + " This is a bug; please file an issue or email the developer mailing list. You will " + + "need the following exception details when seeking help from the HBase community.", + exception); + throw exception; + } + boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit()); if (!hasNext) return false; // Scopes are probably in WAL edit, move to key NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index eddb92d..2194ce9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -25,8 +25,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.wal.WALKey; + /** - * Get notification of {@link FSHLog}/WAL log events. The invocations are inline + * Get notification of WAL events. The invocations are inline * so make sure your implementation is fast else you'll slow hbase. */ @InterfaceAudience.Private @@ -35,30 +37,30 @@ public interface WALActionsListener { /** * The WAL is going to be rolled. The oldPath can be null if this is * the first log file from the regionserver. - * @param oldPath the path to the old hlog - * @param newPath the path to the new hlog + * @param oldPath the path to the old wal + * @param newPath the path to the new wal */ void preLogRoll(Path oldPath, Path newPath) throws IOException; /** * The WAL has been rolled. The oldPath can be null if this is * the first log file from the regionserver. - * @param oldPath the path to the old hlog - * @param newPath the path to the new hlog + * @param oldPath the path to the old wal + * @param newPath the path to the new wal */ void postLogRoll(Path oldPath, Path newPath) throws IOException; /** * The WAL is going to be archived. - * @param oldPath the path to the old hlog - * @param newPath the path to the new hlog + * @param oldPath the path to the old wal + * @param newPath the path to the new wal */ void preLogArchive(Path oldPath, Path newPath) throws IOException; /** * The WAL has been archived. - * @param oldPath the path to the old hlog - * @param newPath the path to the new hlog + * @param oldPath the path to the old wal + * @param newPath the path to the new wal */ void postLogArchive(Path oldPath, Path newPath) throws IOException; @@ -79,7 +81,7 @@ public interface WALActionsListener { * @param logEdit */ void visitLogEntryBeforeWrite( - HRegionInfo info, HLogKey logKey, WALEdit logEdit + HRegionInfo info, WALKey logKey, WALEdit logEdit ); /** @@ -87,11 +89,59 @@ public interface WALActionsListener { * @param htd * @param logKey * @param logEdit - * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, HLogKey, WALEdit)} - * It only exists to get scope when replicating. Scope should be in the HLogKey and not need + * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} + * It only exists to get scope when replicating. Scope should be in the WALKey and not need * us passing in a <code>htd</code>. */ void visitLogEntryBeforeWrite( - HTableDescriptor htd, HLogKey logKey, WALEdit logEdit + HTableDescriptor htd, WALKey logKey, WALEdit logEdit ); + + /** + * For notification post append to the writer. Used by metrics system at least. + * TODO: Combine this with above. + * @param entryLen approx length of cells in this append. + * @param elapsedTimeMillis elapsed time in milliseconds. + */ + void postAppend(final long entryLen, final long elapsedTimeMillis); + + /** + * For notification post writer sync. Used by metrics system at least. + * @param timeInNanos How long the filesystem sync took in nanoseconds. + * @param handlerSyncs How many sync handler calls were released by this call to filesystem + * sync. + */ + void postSync(final long timeInNanos, final int handlerSyncs); + + static class Base implements WALActionsListener { + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException {} + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException {} + + @Override + public void preLogArchive(Path oldPath, Path newPath) throws IOException {} + + @Override + public void postLogArchive(Path oldPath, Path newPath) throws IOException {} + + @Override + public void logRollRequested() {} + + @Override + public void logCloseRequested() {} + + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {} + + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {} + + @Override + public void postAppend(final long entryLen, final long elapsedTimeMillis) {} + + @Override + public void postSync(final long timeInNanos, final int handlerSyncs) {} + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index f3927f9..433e5c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -130,7 +130,7 @@ public class WALCellCodec implements Codec { byte[] uncompress(ByteString data, Dictionary dict) throws IOException; } - // TODO: it sucks that compression context is in HLog.Entry. It'd be nice if it was here. + // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here. // Dictionary could be gotten by enum; initially, based on enum, context would create // an array of dictionaries. static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 521e5f3..52dcee0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -28,9 +28,12 @@ import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; + /** * Implements the coprocessor environment and runtime support for coprocessors - * loaded within a {@link FSHLog}. + * loaded within a {@link WAL}. */ @InterfaceAudience.Private public class WALCoprocessorHost @@ -42,10 +45,13 @@ public class WALCoprocessorHost static class WALEnvironment extends CoprocessorHost.Environment implements WALCoprocessorEnvironment { - private FSHLog wal; + private final WAL wal; + + final boolean useLegacyPre; + final boolean useLegacyPost; @Override - public FSHLog getWAL() { + public WAL getWAL() { return wal; } @@ -56,23 +62,32 @@ public class WALCoprocessorHost * @param priority chaining priority * @param seq load sequence * @param conf configuration - * @param hlog HLog + * @param wal WAL */ public WALEnvironment(Class<?> implClass, final Coprocessor impl, final int priority, final int seq, final Configuration conf, - final FSHLog hlog) { + final WAL wal) { super(impl, priority, seq, conf); - this.wal = hlog; + this.wal = wal; + // Pick which version of the API we'll call. + // This way we avoid calling the new version on older WALObservers so + // we can maintain binary compatibility. + // See notes in javadoc for WALObserver + useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class, + HRegionInfo.class, WALKey.class, WALEdit.class); + useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class, + HRegionInfo.class, WALKey.class, WALEdit.class); } } - FSHLog wal; + private final WAL wal; + /** * Constructor * @param log the write ahead log * @param conf the configuration */ - public WALCoprocessorHost(final FSHLog log, final Configuration conf) { + public WALCoprocessorHost(final WAL log, final Configuration conf) { // We don't want to require an Abortable passed down through (FS)HLog, so // this means that a failure to load of a WAL coprocessor won't abort the // server. This isn't ideal, and means that security components that @@ -100,21 +115,29 @@ public class WALCoprocessorHost * @return true if default behavior should be bypassed, false otherwise * @throws IOException */ - public boolean preWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) + public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { boolean bypass = false; if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass; ObserverContext<WALCoprocessorEnvironment> ctx = null; for (WALEnvironment env: coprocessors) { - if (env.getInstance() instanceof - org.apache.hadoop.hbase.coprocessor.WALObserver) { + if (env.getInstance() instanceof WALObserver) { + final WALObserver observer = (WALObserver)env.getInstance(); ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); - ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()). - preWALWrite(ctx, info, logKey, logEdit); + if (env.useLegacyPre) { + if (logKey instanceof HLogKey) { + observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit); + } else { + legacyWarning(observer.getClass(), + "There are wal keys present that are not HLogKey."); + } + } else { + observer.preWALWrite(ctx, info, logKey, logEdit); + } } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { @@ -135,20 +158,28 @@ public class WALCoprocessorHost * @param logEdit * @throws IOException */ - public void postWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) + public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { if (this.coprocessors == null || this.coprocessors.isEmpty()) return; ObserverContext<WALCoprocessorEnvironment> ctx = null; for (WALEnvironment env: coprocessors) { - if (env.getInstance() instanceof - org.apache.hadoop.hbase.coprocessor.WALObserver) { + if (env.getInstance() instanceof WALObserver) { + final WALObserver observer = (WALObserver)env.getInstance(); ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); - ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()). - postWALWrite(ctx, info, logKey, logEdit); + if (env.useLegacyPost) { + if (logKey instanceof HLogKey) { + observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit); + } else { + legacyWarning(observer.getClass(), + "There are wal keys present that are not HLogKey."); + } + } else { + observer.postWALWrite(ctx, info, logKey, logEdit); + } } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 172e478..05cead2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -52,7 +52,7 @@ import org.apache.hadoop.io.Writable; * for serializing/deserializing a set of KeyValue items. * * Previously, if a transaction contains 3 edits to c1, c2, c3 for a row R, - * the HLog would have three log entries as follows: + * the WAL would have three log entries as follows: * * <logseq1-for-edit1>:<KeyValue-for-edit-c1> * <logseq2-for-edit2>:<KeyValue-for-edit-c2> @@ -73,7 +73,7 @@ import org.apache.hadoop.io.Writable; * <-1, 3, <Keyvalue-for-edit-c1>, <KeyValue-for-edit-c2>, <KeyValue-for-edit-c3>> * * The -1 marker is just a special way of being backward compatible with - * an old HLog which would have contained a single <KeyValue>. + * an old WAL which would have contained a single <KeyValue>. * * The deserializer for WALEdit backward compatibly detects if the record * is an old style KeyValue or the new style WALEdit. @@ -168,7 +168,7 @@ public class WALEdit implements Writable, HeapSize { int versionOrLength = in.readInt(); // TODO: Change version when we protobuf. Also, change way we serialize KV! Pb it too. if (versionOrLength == VERSION_2) { - // this is new style HLog entry containing multiple KeyValues. + // this is new style WAL entry containing multiple KeyValues. int numEdits = in.readInt(); for (int idx = 0; idx < numEdits; idx++) { if (compressionContext != null) { @@ -189,7 +189,7 @@ public class WALEdit implements Writable, HeapSize { } } } else { - // this is an old style HLog entry. The int that we just + // this is an old style WAL entry. The int that we just // read is actually the length of a single KeyValue this.add(KeyValue.create(versionOrLength, in)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 9c0b8a9..ff5f2f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Entry; import com.google.protobuf.ServiceException; @@ -96,17 +97,17 @@ public class WALEditsReplaySink { * @param entries * @throws IOException */ - public void replayEntries(List<Pair<HRegionLocation, HLog.Entry>> entries) throws IOException { + public void replayEntries(List<Pair<HRegionLocation, Entry>> entries) throws IOException { if (entries.size() == 0) { return; } int batchSize = entries.size(); - Map<HRegionInfo, List<HLog.Entry>> entriesByRegion = - new HashMap<HRegionInfo, List<HLog.Entry>>(); + Map<HRegionInfo, List<Entry>> entriesByRegion = + new HashMap<HRegionInfo, List<Entry>>(); HRegionLocation loc = null; - HLog.Entry entry = null; - List<HLog.Entry> regionEntries = null; + Entry entry = null; + List<Entry> regionEntries = null; // Build the action list. for (int i = 0; i < batchSize; i++) { loc = entries.get(i).getFirst(); @@ -114,7 +115,7 @@ public class WALEditsReplaySink { if (entriesByRegion.containsKey(loc.getRegionInfo())) { regionEntries = entriesByRegion.get(loc.getRegionInfo()); } else { - regionEntries = new ArrayList<HLog.Entry>(); + regionEntries = new ArrayList<Entry>(); entriesByRegion.put(loc.getRegionInfo(), regionEntries); } regionEntries.add(entry); @@ -123,9 +124,9 @@ public class WALEditsReplaySink { long startTime = EnvironmentEdgeManager.currentTime(); // replaying edits by region - for (Map.Entry<HRegionInfo, List<HLog.Entry>> _entry : entriesByRegion.entrySet()) { + for (Map.Entry<HRegionInfo, List<Entry>> _entry : entriesByRegion.entrySet()) { HRegionInfo curRegion = _entry.getKey(); - List<HLog.Entry> allActions = _entry.getValue(); + List<Entry> allActions = _entry.getValue(); // send edits in chunks int totalActions = allActions.size(); int replayedActions = 0; @@ -159,7 +160,7 @@ public class WALEditsReplaySink { } private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, - final List<HLog.Entry> entries) throws IOException { + final List<Entry> entries) throws IOException { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); ReplayServerCallable<ReplicateWALEntryResponse> callable = @@ -182,11 +183,11 @@ public class WALEditsReplaySink { */ class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> { private HRegionInfo regionInfo; - private List<HLog.Entry> entries; + private List<Entry> entries; ReplayServerCallable(final HConnection connection, final TableName tableName, final HRegionLocation regionLoc, final HRegionInfo regionInfo, - final List<HLog.Entry> entries) { + final List<Entry> entries) { super(connection, tableName, null); this.entries = entries; this.regionInfo = regionInfo; @@ -203,11 +204,11 @@ public class WALEditsReplaySink { return null; } - private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries) + private void replayToServer(HRegionInfo regionInfo, List<Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; - HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; + Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); @@ -228,11 +229,11 @@ public class WALEditsReplaySink { // if not due to connection issue, the following code should run fast because it uses // cached location boolean skip = false; - for (HLog.Entry entry : this.entries) { + for (Entry entry : this.entries) { WALEdit edit = entry.getEdit(); List<Cell> cells = edit.getCells(); for (Cell cell : cells) { - // filtering HLog meta entries + // filtering WAL meta entries setLocation(conn.locateRegion(tableName, cell.getRow())); skip = true; break; http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java new file mode 100644 index 0000000..5f00643 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; + +import com.google.protobuf.TextFormat; + +/** + * Helper methods to ease Region Server integration with the write ahead log. + * Note that methods in this class specifically should not require access to anything + * other than the API found in {@link WAL}. + */ +@InterfaceAudience.Private +public class WALUtil { + static final Log LOG = LogFactory.getLog(WALUtil.class); + + /** + * Write the marker that a compaction has succeeded and is about to be committed. + * This provides info to the HMaster to allow it to recover the compaction if + * this regionserver dies in the middle (This part is not yet implemented). It also prevents + * the compaction from finishing if this regionserver has already lost its lease on the log. + * @param sequenceId Used by WAL to get sequence Id for the waledit. + */ + public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info, + final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { + TableName tn = TableName.valueOf(c.getTableName().toByteArray()); + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); + log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null); + log.sync(); + if (LOG.isTraceEnabled()) { + LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); + } + } + + /** + * Write a flush marker indicating a start / abort or a complete of a region flush + */ + public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info, + final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException { + TableName tn = TableName.valueOf(f.getTableName().toByteArray()); + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); + long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, + null); + if (sync) log.sync(trx); + if (LOG.isTraceEnabled()) { + LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); + } + return trx; + } + + /** + * Write a region open marker indicating that the region is opened + */ + public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info, + final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException { + TableName tn = TableName.valueOf(r.getTableName().toByteArray()); + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); + long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), + sequenceId, false, null); + log.sync(trx); + if (LOG.isTraceEnabled()) { + LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); + } + return trx; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java index cd3aeaf..8188e02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java @@ -28,12 +28,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; + /** * Context used by our wal dictionary compressor. Null if we're not to do our * custom dictionary compression. */ @InterfaceAudience.Private -public abstract class WriterBase implements HLog.Writer { +public abstract class WriterBase implements DefaultWALProvider.Writer { protected CompressionContext compressionContext; protected Configuration conf; http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 281ba63..6a3981a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.wal.WAL.Entry; /** * A {@link WALEntryFilter} which contains multiple filters and applies them http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 03b66d2..c3ec976 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import com.google.common.util.concurrent.Service; @@ -128,13 +128,13 @@ public interface ReplicationEndpoint extends Service { */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class ReplicateContext { - List<HLog.Entry> entries; + List<Entry> entries; int size; @InterfaceAudience.Private public ReplicateContext() { } - public ReplicateContext setEntries(List<HLog.Entry> entries) { + public ReplicateContext setEntries(List<Entry> entries) { this.entries = entries; return this; } @@ -142,7 +142,7 @@ public interface ReplicationEndpoint extends Service { this.size = size; return this; } - public List<HLog.Entry> getEntries() { + public List<Entry> getEntries() { return entries; } public int getSize() { http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 5df7b25..166dc37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -24,7 +24,7 @@ import java.util.NavigableMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.wal.WAL.Entry; /** * Keeps KVs that are scoped other than local http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java index b683ad6..46b8b81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.wal.WAL.Entry; /** * Skips WAL edits for all System tables including META http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java index 0ea267d..b892512 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java @@ -26,7 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.util.Bytes; public class TableCfWALEntryFilter implements WALEntryFilter { http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java index 60797c9..b66ddde 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL.Entry; /** * A Filter for WAL entries before being sent over to replication. Multiple @@ -30,12 +30,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; public interface WALEntryFilter { /** - * Applies the filter, possibly returning a different HLog.Entry instance. + * Applies the filter, possibly returning a different Entry instance. * If null is returned, the entry will be skipped. - * @param entry WAL Entry to filter - * @return a (possibly modified) HLog.Entry to use. Returning null or an entry with + * @param entry Entry to filter + * @return a (possibly modified) Entry to use. Returning null or an entry with * no cells will cause the entry to be skipped for replication. */ - public HLog.Entry filter(HLog.Entry entry); + public Entry filter(Entry entry); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 8f099d7..525b7ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -61,17 +61,17 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo return files; } - final Set<String> hlogs = loadHLogsFromQueues(); + final Set<String> wals = loadWALsFromQueues(); return Iterables.filter(files, new Predicate<FileStatus>() { @Override public boolean apply(FileStatus file) { - String hlog = file.getPath().getName(); - boolean logInReplicationQueue = hlogs.contains(hlog); + String wal = file.getPath().getName(); + boolean logInReplicationQueue = wals.contains(wal); if (LOG.isDebugEnabled()) { if (logInReplicationQueue) { - LOG.debug("Found log in ZK, keeping: " + hlog); + LOG.debug("Found log in ZK, keeping: " + wal); } else { - LOG.debug("Didn't find this log in ZK, deleting: " + hlog); + LOG.debug("Didn't find this log in ZK, deleting: " + wal); } } return !logInReplicationQueue; @@ -79,15 +79,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo } /** - * Load all hlogs in all replication queues from ZK + * Load all wals in all replication queues from ZK */ - private Set<String> loadHLogsFromQueues() { + private Set<String> loadWALsFromQueues() { List<String> rss = replicationQueues.getListOfReplicators(); if (rss == null) { LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); return ImmutableSet.of(); } - Set<String> hlogs = Sets.newHashSet(); + Set<String> wals = Sets.newHashSet(); for (String rs: rss) { List<String> listOfPeers = replicationQueues.getAllQueues(rs); // if rs just died, this will be null @@ -95,13 +95,13 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo continue; } for (String id : listOfPeers) { - List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id); - if (peersHlogs != null) { - hlogs.addAll(peersHlogs); + List<String> peersWals = replicationQueues.getLogsInQueue(rs, id); + if (peersWals != null) { + wals.addAll(peersWals); } } } - return hlogs; + return wals; } @Override @@ -109,7 +109,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo // If replication is disabled, keep all members null if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT)) { - LOG.warn("Not configured - allowing all hlogs to be deleted"); + LOG.warn("Not configured - allowing all wals to be deleted"); return; } // Make my own Configuration. Then I'll have my own connection to zk that http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 0906847..397044d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; @@ -136,7 +136,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi */ @Override public boolean replicate(ReplicateContext replicateContext) { - List<HLog.Entry> entries = replicateContext.getEntries(); + List<Entry> entries = replicateContext.getEntries(); int sleepMultiplier = 1; while (this.isRunning()) { if (!peersSelected) { @@ -159,7 +159,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi " entries of total size " + replicateContext.getSize()); } ReplicationProtbufUtil.replicateWALEntry(rrs, - entries.toArray(new HLog.Entry[entries.size()])); + entries.toArray(new Entry[entries.size()])); // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 2c413f4..4729644 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -66,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @InterfaceAudience.Private -public class Replication implements WALActionsListener, +public class Replication extends WALActionsListener.Base implements ReplicationSourceService, ReplicationSinkService { private static final Log LOG = LogFactory.getLog(Replication.class); @@ -155,7 +155,7 @@ public class Replication implements WALActionsListener, } /* - * Returns an object to listen to new hlog changes + * Returns an object to listen to new wal changes **/ public WALActionsListener getWALActionsListener() { return this; @@ -222,13 +222,7 @@ public class Replication implements WALActionsListener, } @Override - public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, - WALEdit logEdit) { - // Not interested - } - - @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { scopeWALEdits(htd, logKey, logEdit); } @@ -240,7 +234,7 @@ public class Replication implements WALActionsListener, * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes */ - public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey, + public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); @@ -273,16 +267,6 @@ public class Replication implements WALActionsListener, getReplicationManager().postLogRoll(newPath); } - @Override - public void preLogArchive(Path oldPath, Path newPath) throws IOException { - // Not interested - } - - @Override - public void postLogArchive(Path oldPath, Path newPath) throws IOException { - // Not interested - } - /** * This method modifies the master's configuration in order to inject * replication-related features @@ -299,16 +283,6 @@ public class Replication implements WALActionsListener, } } - @Override - public void logRollRequested() { - // Not interested - } - - @Override - public void logCloseRequested() { - // not interested - } - /* * Statistics thread. Periodically prints the cache statistics to the log. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java deleted file mode 100644 index ccae169..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; - -import java.io.IOException; - -/** - * Wrapper class around HLog to help manage the implementation details - * such as compression. - */ -@InterfaceAudience.Private -public class ReplicationHLogReaderManager { - - private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class); - private final FileSystem fs; - private final Configuration conf; - private long position = 0; - private HLog.Reader reader; - private Path lastPath; - - /** - * Creates the helper but doesn't open any file - * Use setInitialPosition after using the constructor if some content needs to be skipped - * @param fs - * @param conf - */ - public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) { - this.fs = fs; - this.conf = conf; - } - - /** - * Opens the file at the current position - * @param path - * @return an HLog reader. - * @throws IOException - */ - public HLog.Reader openReader(Path path) throws IOException { - // Detect if this is a new file, if so get a new reader else - // reset the current reader so that we see the new data - if (this.reader == null || !this.lastPath.equals(path)) { - this.closeReader(); - this.reader = HLogFactory.createReader(this.fs, path, this.conf); - this.lastPath = path; - } else { - try { - this.reader.reset(); - } catch (NullPointerException npe) { - throw new IOException("NPE resetting reader, likely HDFS-4380", npe); - } - } - return this.reader; - } - - /** - * Get the next entry, returned and also added in the array - * @return a new entry or null - * @throws IOException - */ - public HLog.Entry readNextAndSetPosition() throws IOException { - HLog.Entry entry = this.reader.next(); - // Store the position so that in the future the reader can start - // reading from here. If the above call to next() throws an - // exception, the position won't be changed and retry will happen - // from the last known good position - this.position = this.reader.getPosition(); - // We need to set the CC to null else it will be compressed when sent to the sink - if (entry != null) { - entry.setCompressionContext(null); - } - return entry; - } - - /** - * Advance the reader to the current position - * @throws IOException - */ - public void seek() throws IOException { - if (this.position != 0) { - this.reader.seek(this.position); - } - } - - /** - * Get the position that we stopped reading at - * @return current position, cannot be negative - */ - public long getPosition() { - return this.position; - } - - public void setPosition(long pos) { - this.position = pos; - } - - /** - * Close the current reader - * @throws IOException - */ - public void closeReader() throws IOException { - if (this.reader != null) { - this.reader.close(); - this.reader = null; - } - } - - /** - * Tell the helper to reset internal state - */ - void finishCurrentFile() { - this.position = 0; - try { - this.closeReader(); - } catch (IOException e) { - LOG.warn("Unable to close reader", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 7ed7bec..9a60131 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; * <p/> * This replication process is currently waiting for the edits to be applied * before the method can return. This means that the replication of edits - * is synchronized (after reading from HLogs in ReplicationSource) and that a + * is synchronized (after reading from WALs in ReplicationSource) and that a * single region server cannot receive edits from two sources at the same time * <p/> * This class uses the native HBase client in order to replicate entries.